Spark SelectDB Connector
快速介绍
Spark Selectdb Connector支持将上游的大数据量写入到SelectDB Cloud中。
实现原理
Spark SelectDB Connector 底层实现依赖于SelectDB Cloud的stage导入方式,通过调用SelectDB Cloud api (/copy/upload),返回一个重定向的对象存储地址,使用http的方式向对象存储地址发送字节流,最后通过copy into(/copyinto)的方式将对象存储桶中的数据导入到SelectDB Cloud中。Stage导入方式的具体使用可以参阅官网Stage导入 - 云原生实时数据仓库。
版本支持
Spark2.3/3.1/3.2
使用
Maven引用
<dependency>
<groupId>com.selectdb</groupId>
<artifactId>spark-selectdb-connector-3.2_2.12</artifactId>
<version>1.0.0</version>
</dependency>
更多版本可参考这里 (opens in a new tab)
Runtime Jar
也可在这里 (opens in a new tab)直接下载jar包。将下载的jar包复制到 Spark
的 ClassPath
中即可使用 spark-selectdb-connector
。例如,Local
模式运行的 Spark
,将此文件放入 jars/
文件夹下。Yarn
集群模式运行的Spark
,则将此文件放入预部署包中。例如将 spark-selectdb-connector-2.3.4-2.11-1.0.0.jar
上传到 hdfs并在spark.yarn.jars参数上添加 hdfs上的Jar包路径
- 上传
spark-selectdb-connector-2.3.4-2.11-1.0.0.jar
到hdfs。
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-selectdb-connector-2.3.4-2.11-1.0.0.jar /spark-jars/
- 在集群中添加
spark-selectdb-connector-3.1.2-2.12-1.0.0.jar
依赖。
spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar
使用示例
通过sparksql的方式写入
val selectdbHttpPort = "127.0.0.1:47968"
val selectdbJdbc = "jdbc:mysql://127.0.0.1:18836/test"
val selectdbUser = "admin"
val selectdbPwd = "selectdb2022"
val selectdbTable = "test.test_order"
CREATE TEMPORARY VIEW test_order
USING selectdb
OPTIONS(
"table.identifier"="test.test_order",
"jdbc.url"="${selectdbJdbc}",
"http.port"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.file.type"="json"
);
insert into test_order select order_id,order_amount,order_status from tmp_tb ;
通过DataFrame方式写入
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "待付款"),
("2", 200, null),
("3", 300, "已收货")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("selectdb")
.option("selectdb.http.port", selectdbHttpPort)
.option("selectdb.table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 4)
.option("sink.max-retries", 2)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()
配置
Key | DefaultValue | Comment | Required |
---|---|---|---|
selectdb.http.port | -- | selectdb cloud http地址 | Y |
selectdb.jdbc.url | -- | selectdb cloud jdbc地址,此配置为spark sql所属 | Y |
selectdb.table.identifier | -- | selectdb cloud表名,格式 库名.表名,例如:db1.tbl1 | Y |
user | -- | 访问selectdb cloud的用户名 | Y |
password | -- | 访问selectdb cloud的密码 | Y |
sink.batch.size | 100000 | 单次写selectdb cloud的最大行数 | N |
sink.max-retries | 3 | 写selectdb失败之后的重试次数 | N |
sink.properties.* | -- | copy into的导入参数。例如:"sink.properties.file.type"="json"关于copy into更多的参数说明,请参阅selectdb官网copy into章节。 | N |