跳到主要内容

Stream Load 导入

本文介绍如何通过 Stream Load 导入数据至 SelectDB Cloud 仓库中。

背景信息

Stream Load 是属于同步接口的导入方式,用户通过发送 HTTP 请求将本地文件或数据流导入到 SelectDB Cloud 仓库中。Stream Load 执行并返回导入结果,用户可直接通过请求的返回体判断本次导入是否成功。

Stream Load 主要适用于导入本地文件或通过程序导入数据流中的数据,支持的数据格式包括:CSV(文本)、JSON、PARQUET 和 ORC。

创建导入

Stream Load 通过 HTTP 协议提交和传输数据,这里通过 curl 命令展示如何提交导入。用户也可以通过其他 HTTP Client 进行操作。

语法

# Header 中支持的属性,请参见下面的参数说明。
# 格式为: -H "key1:value1"。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://host:port/api/{db}/{table}/_stream_load

参数说明

参数名称参数说明
--location-trusted当需要认证时,会将 user 和 password 传递给被重定向到的服务器。
-u用户名和密码。
-T需要导入的数据文件。
-XPUTHTTP 请求的 Method,采用 PUT 请求方法。其中 host 为 SelectDB Cloud 仓库的私网地址或公网地址,port 为 HTTP 端口号。

由于 Stream Load 使用的是 HTTP 协议,所以导入任务有关的参数主要设置在 Header 中。常用的导入参数如下。

参数名称参数说明
label导入任务的唯一标识。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。label 也可用于防止用户重复导入相同的数据,强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once。当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。
format指定导入数据格式,支持 CSV、JSON、PARQUET、ORC,默认值为 CSV。支持 csv_with_names(CSV 文件行首过滤)、csv_with_names_and_types(CSV 文件前两行过滤)。
line_delimiter用于指定导入文件中的换行符,默认为 \n。您可以使用做多个字符的组合作为换行符。
column_separator用于指定导入文件中的列分隔符,默认为 \t。您可以使用多个字符的组合作为列分隔符。如果是不可见字符,则需要加 \x 作为前缀,使用十六进制来表示分隔符。如Hive文件的分隔符 \x01,需要指定为 -H "column_separator:\x01"。
compress_type指定文件的压缩格式。目前只支持 CSV 文件的压缩,支持 gz、lzo、bz2、lz4、lzop、deflate 压缩格式。
max_filter_ratio导入任务的最大容忍率,默认为 0 容忍,取值范围是 0~1。当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
cloud_cluster用于指定导入使用的集群。如果不指定,则使用用户的默认集群,如果用户没有设置默认集群,则自动为用户选择一个有权限的集群。
where导入任务指定的过滤条件。支持对原始数据指定 where 语句进行过滤,被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入 num_rows_unselected。
partitions待导入数据的 Partition 信息。如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL。
columns待导入数据的函数变换配置。支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
merge_type数据合并类型,默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 表模型。其中 MERGE 类型需要配合 delete 参数使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。
delete仅在 MERGE 下有意义,表示数据的删除条件 function_column.sequence_col:只适用于 UNIQUE_KEYS,相同 key 列下,保证 value 列按照 source_sequence 列进行 REPLACE, source_sequence 可以是数据源中的列,也可以是表结构中的一列。
exec_mem_limit导入内存限制。默认为 2 GB,单位为字节。
timeout指定导入的超时时间,单位:秒,默认是 600 秒。可设置范围为 1~259200 秒。
timezone指定本次导入所使用的时区,默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。
two_phase_commit是否开启两阶段事务提交模式,默认为 false。开启两阶段事务提交模式后,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为 PRECOMMITTED,用户手动触发 commit 操作之后,数据才可见。

由于 Stream Load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户,返回结果样例如下。

{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}

返回结果参数说明如下。

参数名称参数说明
TxnId导入的事务 ID,用户可不感知。
Label导入 Label,由用户指定或系统自动生成。
Status导入状态。"Success" 表示导入成功。"Publish Timeout" 表示导入已经完成,只是数据可能会延迟可见,无需重试。"Label Already Exists" 表示 Label 重复,需更换 Label。"Fail" 表示导入失败。
ExistingJobStatus已存在的 Label 对应的导入作业的状态。这个字段只有在当 Status 为 "Label Already Exists" 时才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。"RUNNING" 表示作业还在执行。"FINISHED" 表示作业成功。
Message错误信息提示。
NumberTotalRows导入总处理的行数。
NumberLoadedRows成功导入的行数。
NumberFilteredRows数据质量不合格的行数。
NumberUnselectedRows被 where 条件过滤的行数。
LoadBytes导入的字节数。
LoadTimeMs导入完成时间,单位毫秒。
BeginTxnTimeMs向 FE 请求开始一个事务所花费的时间,单位:毫秒。
StreamLoadPutTimeMs向 FE 请求获取导入数据执行计划所花费的时间,单位:毫秒。
ReadDataTimeMs读取数据所花费的时间,单位:毫秒。
WriteDataTimeMs执行写入数据操作所花费的时间,单位:毫秒。
CommitAndPublishTimeMs向Fe请求提交并且发布事务所花费的时间,单位:毫秒。
ErrorURL如果有数据质量问题,通过访问这个 URL 查看具体错误行。

示例

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" http://host:port/api/test_db/test_table/_stream_load

取消导入

用户无法手动取消 Stream Load,Stream Load 在超时或者导入错误后会被系统自动取消。

查看 Stream Load

您可以通过show stream load来查看已经完成的Stream load任务。默认BE(BackEnd)不保留Stream Load的启用记录,如果您要查看则需要在BE上启用记录,配置参数为:enable_stream_load_record=true,具体操作请参见BE配置项。​

使用示例

  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,使用 Label 用于去重,指定超时时间为 100 秒。

     curl --location-trusted -u root -H "label:123" -H "timeout:100" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,使用 label 用于去重,并且只导入 k1 等于 20180601 的数据。

    curl --location-trusted -u root -H "label:123" -H "where: k1=20180601" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,允许 20% 的错误率(用户是 defalut_cluster 中的)。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,允许 20% 的错误率,并且指定文件的列名。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表中的 p1、p2 分区,允许 20% 的错误率。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 使用流式方式(streaming)导入。

    seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T - http://host:port/api/test_db/test_table/_stream_load
  • 导入含有 HLL 列的表,可以是表中的列或者数据中的列用于生成 HLL 列,也可使用 hll_empty 补充数据中没有的列。

    curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 导入数据进行严格模式过滤,并设置时区为 Africa/Abidjan。

    curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 删除与这批导入 key 相同的数据。

    curl --location-trusted -u root -H "merge_type: DELETE" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将这批数据中与 flag 列为 1 的数据相匹配的列删除,其他行正常追加。

    curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 导入数据到含有 sequence 列的 UNIQUE_KEYS 表中。

    curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T test.data http://host:port/api/test_db/test_table/_stream_load

相关系统配置

FE 配置

stream_load_default_timeout_second:导入任务的超时时间,单位:秒。默认的 timeout 时间为 600 秒,导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。如果导入的源文件无法在规定时间内完成导入,您可以在 Stream Load 请求中设置单独的超时时间或者调整 FE 的参数 stream_load_default_timeout_second 来设置全局的默认超时时间。

BE 配置

streaming_load_max_mb:Stream Load 的最大导入大小,单位:MB,默认值为 10240 MB。如果您的原始文件超过该值,则需要调整 BE 参数 streaming_load_max_mb