Flink SelectDB Connector
Flink SelectDB Connector 支持通过Flink DataStream API 和 FlinkSQL 将上游的数据写入到SelectDB中。
Connector v2.0.0
版本支持
Flink 1.13-1.17
使用
Maven引用
<dependency>
<groupId>com.selectdb</groupId>
<artifactId>flink-selectdb-connector-1.16</artifactId>
<version>2.0.0</version>
</dependency>
更多版本可参考这里 (opens in a new tab)
Runtime Jar
也可在这里 (opens in a new tab)直接下载jar包,放入FLINK_HOME/lib 下即可使用。
使用方法
FlinkSQL
-- 开启checkpoint
SET 'execution.checkpointing.interval' = '30s';
-- 禁用chain(目的是将Sink的Writer和Commiter拆开,可以异步执行导入)
SET 'pipeline.operator-chaining' = 'false';
CREATE TABLE selectdb_sink (
name STRING,
age INT
)
WITH (
'connector' = 'selectdb',
'load-url' = 'xxx.privatelink.aliyun.com:47057',
'jdbc-url' = 'xxx.privatelink.aliyun.com:30523',
'cluster-name' = 'clustername',
'table.identifier' = 'database.table',
'username' = 'admin',
'password' = '',
-- csv 写入
-- 'sink.properties.file.column_separator' = '\x01',
-- 'sink.properties.file.line_delimiter' = '\x02',
-- json 写入
'sink.properties.file.type' = 'json',
'sink.properties.file.strip_outer_array' = 'false',
-- 同步delete事件
'sink.enable-delete' = 'false'
);
DataStream
// enable checkpoint
env.enableCheckpointing(10000);
SelectdbSink.Builder<String> builder = SelectdbSink.builder();
SelectdbOptions.Builder selectdbBuilder = SelectdbOptions.builder();
selectdbBuilder.setLoadUrl("ip:httpPort")
.setJdbcUrl("ip:jdbcPort")
.setClusterName("clustername")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("password");
Properties properties = new Properties();
//csv写入
properties.setProperty("file.column_separator", ",");
properties.setProperty("file.line_delimiter", "\n");
properties.setProperty("file.type", "csv");
//json写入
//properties.setProperty("file.strip_outer_array", "false");
//properties.setProperty("file.type", "json");
// 从 kafka 或其他数据质量符合规范的数据源同步
SelectdbExecutionOptions.Builder executionBuilder = SelectdbExecutionOptions.builder();
executionBuilder.setLoadProps(properties);
builder.setSelectdbExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer()) //serialize according to string
.setSelectdbOptions(selectdbBuilder.build());
// 从 cdc 数据源同步
//SelectdbExecutionOptions.Builder executionBuilder = SelectdbExecutionOptions.builder();
//executionBuilder.setLoadProps(properties).setDeletable(true);
//
//builder.setSelectdbExecutionOptions(executionBuilder.build())
// .setSerializer(JsonDebeziumSchemaSerializer.builder().setSelectdbOptions(selectdbBuilder.build()).build())
// .setSelectdbOptions(selectdbBuilder.build());
//mock csv string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("selectdb",1));
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "," + t.f1)
.sinkTo(builder.build())
.disableChaining();//将Sink的Writer和Commiter算子拆开,可以异步执行导入。
配置项
Key | Default Value | Required | Description |
---|---|---|---|
load-url | - | Y | Selectdb 导入 url,例:连接地址:httpPort |
jdbc-url | - | Y | Selectdb 查询 url,例:连接地址:jdbcPort |
cluster-name | - | Y | selectdb 集群名称,仓库下可能有多个集群,按需选择使用 |
table.identifier | - | Y | 写入的表,例:db.table |
username | - | Y | 用户名 |
password | - | Y | 密码 |
sink.properties | - | Y | 写入属性配置 CSV写入:sink.properties.file.type='csv' sink.properties.file.column_separator=',' sink.properties.file.line_delimiter='\n' JSON写入: sink.properties.file.type='json' sink.properties.file.strip_outer_array='false' |
sink.buffer-size | 5242880 (5MB) | N | 缓存的最大容量,单位字节,超过会flush到对象存储上,默认5MB。 |
sink.buffer-count | 10000 | N | 缓存的最大条数,超过会flush到对象存储上,默认10000。 |
sink.max-retries | 3 | N | Commit阶段(Copy Into执行)的最大重试次数,默认3次 |
sink.enable-delete | false | N | 是否同步删除事件,默认false |
sink.flush.queue-size | 1 | N | 上传对象存储时,缓冲队列的大小。 |
使用FlinkCDC接入多表或整库示例
语法
<FLINK_HOME>bin/flink run \
-c com.selectdb.flink.tools.cdc.CdcTools \
lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
--database <selectdb-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <selectdb-table-prefix>] \
[--table-suffix <selectdb-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--sink-conf <selectdb-sink-conf> [--table-conf <selectdb-sink-conf> ...] \
[--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]
- --job-name Flink任务名称, 非必需。
- --database 同步到SelecteDB的数据库名。
- --table-prefix SelecteDB表前缀名,例如 --table-prefix ods_。
- --table-suffix 同上,SelecteDB表的后缀名。
- --including-tables 需要同步的MySQL表,可以使用"|" 分隔多个表,并支持正则表达式。 比如--including-tables table1|tbl.*就是同步table1和所有以tbl开头的表。
- --excluding-tables 不需要同步的表,用法同上。
- --mysql-conf MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1 ,您可以在这里 (opens in a new tab)查看所有配置MySQL-CDC,其中hostname/username/password/database-name 是必需的。
- --oracle-conf Oracle CDCSource 配置,例如--oracle-conf hostname=127.0.0.1 ,您可以在这里 (opens in a new tab)查看所有配置Oracle-CDC,其中hostname/username/password/database-name/schema-name 是必需的。
- --sink-conf SelectDB Sink 的所有配置,可以在上文中查看完整的配置项。
- --table-conf SelectDB表的配置项,即properties中包含的内容。
注:
同步时需要在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar
整库同步支持Flink1.15以上的版本,Connector Snapshot包下载地址可参考这里 (opens in a new tab)
MySQL同步示例
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.selectdb.flink.tools.cdc.CdcTools \
lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf load-url=xxx.privatelink.aliyun.com:47057 \
--sink-conf jdbc-url=xxx.privatelink.aliyun.com:21477 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf cluster-name=Cluster01
Oracle同步示例
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.selectdb.flink.tools.cdc.CdcTools \
lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|test.*" \
--sink-conf load-url=xxx.privatelink.aliyun.com:47057 \
--sink-conf jdbc-url=xxx.privatelink.aliyun.com:21477 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf cluster-name=Cluster01
PostgreSQL同步示例
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.selectdb.flink.tools.cdc.CdcTools \
lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|test.*" \
--sink-conf load-url=xxx.privatelink.aliyun.com:47057 \
--sink-conf jdbc-url=xxx.privatelink.aliyun.com:21477 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf cluster-name=Cluster01
SQLServer同步示例
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.selectdb.flink.tools.cdc.CdcTools \
lib/flink-selectdb-connector-1.17-2.1.0-SNAPSHOT.jar \
sqlserver-sync-database \
--database db1\
--sqlserver-conf hostname=127.0.0.1 \
--sqlserver-conf port=1433 \
--sqlserver-conf username=sa \
--sqlserver-conf password="123456" \
--sqlserver-conf database-name=CDC_DB \
--sqlserver-conf schema-name=dbo \
--including-tables "tbl1|test.*" \
--sink-conf load-url=xxx.privatelink.aliyun.com:47057 \
--sink-conf jdbc-url=xxx.privatelink.aliyun.com:21477 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf cluster-name=Cluster01
Connector v1.0.0
版本支持
Flink | Java | Runtime Jar |
---|---|---|
1.13 | 8 | scala2.11 flink-selectdb-connector-1.13_2.11-1.0.0 (opens in a new tab) scala2.12 flink-selectdb-connector-1.13_2.12-1.0.0 (opens in a new tab) |
1.14 | 8 | scala2.11 flink-selectdb-connector-1.14_2.11-1.0.0 (opens in a new tab) scala2.12 flink-selectdb-connector-1.14_2.12-1.0.0 (opens in a new tab) |
1.15,1.16,1.17 | 8 | flink-selectdb-connector-1.0.0 (opens in a new tab) |
使用方法
Flink 1.13
FlinkSQL
CREATE TABLE selectdb_sink (
name STRING,
age INT
)
WITH (
'connector' = 'selectdb',
'load-url' = 'xxx.privatelink.aliyun.com:47057',
'jdbc-url' = 'xxx.privatelink.aliyun.com:30523',
'cluster-name' = 'clustername',
'table.identifier' = 'database.table',
'username' = 'admin',
'password' = '',
'sink.properties.file.type' = 'json',
'sink.properties.file.strip_outer_array' = 'true'
);
DataStream
Properties pro = new Properties();
pro.setProperty("file.type", "json");
pro.setProperty("file.strip_outer_array", "true");
env.fromElements("{\"name\": \"zhangsan\", \"age\": \"1\"}")
.addSink(
DorisSink.sink(
DorisExecutionOptions.builder()
.setStreamLoadProp(pro).build(),
DorisOptions.builder()
.setLoadUrl("xxx.privatelink.aliyun.com:47057")
.setJdbcUrl("xxx.privatelink.aliyun.com:30523")
.setClusterName("clustername")
.setTableIdentifier("database.tablename")
.setUsername("admin")
.setPassword("").build()
));
Flink1.14+
FlinkSQL
-- 开启checkpoint
SET 'execution.checkpointing.interval' = '30s';
-- 禁用chain(目的是将Sink的Writer和Commiter拆开,可以异步执行导入)
SET 'pipeline.operator-chaining' = 'false';
CREATE TABLE selectdb_sink (
name STRING,
age INT
)
WITH (
'connector' = 'selectdb',
'load-url' = 'xxx.privatelink.aliyun.com:47057',
'jdbc-url' = 'xxx.privatelink.aliyun.com:30523',
'cluster-name' = 'clustername',
'table.identifier' = 'database.table',
'username' = 'admin',
'password' = '',
-- csv 写入
-- 'sink.properties.file.column_separator' = '\x01',
-- 'sink.properties.file.line_delimiter' = '\x02',
-- json 写入
'sink.properties.file.type' = 'json',
'sink.properties.file.strip_outer_array' = 'false',
-- 同步delete事件
'sink.enable-delete' = 'true'
);
DataStream
// enable checkpoint
env.enableCheckpointing(30000);
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setLoadUrl("ip:httpPort")
.setJdbcUrl("ip:jdbcPort")
.setClusterName("clustername")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("password");
Properties properties = new Properties();
//csv写入
properties.setProperty("file.column_separator", ",");
properties.setProperty("file.line_delimiter", "\n");
properties.setProperty("file.type", "csv");
//json写入
//properties.setProperty("file.strip_outer_array", "false");
//properties.setProperty("file.type", "json");
// 从 kafka 或其他数据质量符合规范的数据源同步
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setStreamLoadProp(properties);
builder.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer()) //serialize according to string
.setDorisOptions(dorisBuilder.build());
// 从 cdc 数据源同步
//DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
//executionBuilder.setStreamLoadProp(properties).setDeletable(true);
//
//builder.setDorisExecutionOptions(executionBuilder.build())
// .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build())
// .setDorisOptions(dorisBuilder.build());
//mock csv string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "," + t.f1)
.sinkTo(builder.build())
.disableChaining();//将Sink的Writer和Commiter算子拆开,可以异步执行导入。
配置项
Key | Default Value | Required | Description |
---|---|---|---|
load-url | - | Y | Selectdb 导入 url,例:连接地址:httpPort |
jdbc-url | - | Y | Selectdb 查询 url,例:连接地址:jdbcPort |
cluster-name | - | Y | selectdb 集群名称 |
table.identifier | - | Y | 写入的表,例:db.table |
username | - | Y | 用户名 |
password | - | Y | 密码 |
sink.properties | - | Y | 写入属性配置 CSV写入:sink.properties.file.type='csv' sink.properties.file.column_separator=',' sink.properties.file.line_delimiter='\n' JSON写入: sink.properties.file.type='json' sink.properties.file.strip_outer_array='false' (Flink1.13中为true) |
sink.enable-delete | false | N | 是否同步删除事件,默认false |
Flink1.13参数
Key | Default Value | Required | Description |
---|---|---|---|
sink.batch.size | 10000 | N | 单次flush的最大行数 |
sink.max-retries | 3 | N | flush失败后的重试次数 |
sink.batch.interval | 10s | N | flush的间隔时间,默认值10秒。支持时间单位ms/s/min/h/d(默认毫秒)。设置为0表示关闭定期写入。 |
sink.batch.bytes | 10485760(10MB) | N | flush的最大字节数,单位字节,默认10MB |
Flink1.14+参数
Key | Default Value | Required | Description |
---|---|---|---|
sink.buffer-size | 1024*1024 (1MB) | N | 写数据缓存buffer大小,单位字节。默认1MB,不建议修改。 |
sink.buffer-count | 3 | N | 写数据缓存buffer个数,默认3,不建议修改。 |
sink.max-retries | 1 | N | Commit阶段的最大重试次数,默认1次 |
sink.check-interval | 10000 | N | 定期写文件的间隔,单位毫秒,默认10秒,不建议修改。 |