SelectDB Cloud
开发指南
数据导入
Stream Load 导入

Stream Load 导入

本文介绍如何通过 Stream Load 导入数据至 SelectDB Cloud 仓库中。

背景信息

Stream Load 是属于同步接口的导入方式,用户通过发送 HTTP 请求将本地文件或数据流导入到 SelectDB Cloud 仓库中。Stream Load 执行并返回导入结果,用户可直接通过请求的返回体判断本次导入是否成功。

Stream Load 主要适用于导入本地文件或通过程序导入数据流中的数据,支持的数据格式包括:CSV(文本)、JSON、PARQUET 和 ORC。

创建导入

Stream Load 通过 HTTP 协议提交和传输数据,这里通过 curl 命令展示如何提交导入。用户也可以通过其他 HTTP Client 进行操作。

语法

# Header 中支持的属性,请参见下面的参数说明。
# 格式为: -H "key1:value1"。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://host:port/api/{db}/{table}/_stream_load

参数说明

参数名称参数说明
--location-trusted当需要认证时,会将 user 和 password 传递给被重定向到的服务器。
-u用户名和密码。
-T需要导入的数据文件。
-XPUTHTTP 请求的 Method,采用 PUT 请求方法。其中 host 为 SelectDB Cloud 仓库的私网地址或公网地址,port 为 HTTP 端口号。

由于 Stream Load 使用的是 HTTP 协议,所以导入任务有关的参数主要设置在 Header 中。常用的导入参数如下。

参数名称参数说明
label导入任务的唯一标识。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。label 也可用于防止用户重复导入相同的数据,强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once。当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。
format指定导入数据格式,支持 CSV、JSON、PARQUET、ORC,默认值为 CSV。支持 csv_with_names(CSV 文件行首过滤)、csv_with_names_and_types(CSV 文件前两行过滤)。
line_delimiter用于指定导入文件中的换行符,默认为 \n。您可以使用做多个字符的组合作为换行符。
column_separator用于指定导入文件中的列分隔符,默认为 \t。您可以使用多个字符的组合作为列分隔符。如果是不可见字符,则需要加 \x 作为前缀,使用十六进制来表示分隔符。如Hive文件的分隔符 \x01,需要指定为 -H "column_separator:\x01"。
compress_type指定文件的压缩格式。目前只支持 CSV 文件的压缩,支持 gz、lzo、bz2、lz4、lzop、deflate 压缩格式。
max_filter_ratio导入任务的最大容忍率,默认为 0 容忍,取值范围是 0~1。当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
cloud_cluster用于指定导入使用的集群。如果不指定,则使用用户的默认集群,如果用户没有设置默认集群,则自动为用户选择一个有权限的集群。
where导入任务指定的过滤条件。支持对原始数据指定 where 语句进行过滤,被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入 num_rows_unselected。
partitions待导入数据的 Partition 信息。如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL。
columns待导入数据的函数变换配置。支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
merge_type数据合并类型,默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 表模型。其中 MERGE 类型需要配合 delete 参数使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。
delete仅在 MERGE 下有意义,表示数据的删除条件 function_column.sequence_col:只适用于 UNIQUE_KEYS,相同 key 列下,保证 value 列按照 source_sequence 列进行 REPLACE, source_sequence 可以是数据源中的列,也可以是表结构中的一列。
exec_mem_limit导入内存限制。默认为 2 GB,单位为字节。
timeout指定导入的超时时间,单位:秒,默认是 600 秒。可设置范围为 1~259200 秒。
timezone指定本次导入所使用的时区,默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。
two_phase_commit是否开启两阶段事务提交模式,默认为 false。开启两阶段事务提交模式后,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为 PRECOMMITTED,用户手动触发 commit 操作之后,数据才可见。

由于 Stream Load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户,返回结果样例如下。

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

返回结果参数说明如下。

参数名称参数说明
TxnId导入的事务 ID,用户可不感知。
Label导入 Label,由用户指定或系统自动生成。
Status导入状态。"Success" 表示导入成功。"Publish Timeout" 表示导入已经完成,只是数据可能会延迟可见,无需重试。"Label Already Exists" 表示 Label 重复,需更换 Label。"Fail" 表示导入失败。
ExistingJobStatus已存在的 Label 对应的导入作业的状态。这个字段只有在当 Status 为 "Label Already Exists" 时才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。"RUNNING" 表示作业还在执行。"FINISHED" 表示作业成功。
Message错误信息提示。
NumberTotalRows导入总处理的行数。
NumberLoadedRows成功导入的行数。
NumberFilteredRows数据质量不合格的行数。
NumberUnselectedRows被 where 条件过滤的行数。
LoadBytes导入的字节数。
LoadTimeMs导入完成时间,单位毫秒。
BeginTxnTimeMs向 FE 请求开始一个事务所花费的时间,单位:毫秒。
StreamLoadPutTimeMs向 FE 请求获取导入数据执行计划所花费的时间,单位:毫秒。
ReadDataTimeMs读取数据所花费的时间,单位:毫秒。
WriteDataTimeMs执行写入数据操作所花费的时间,单位:毫秒。
CommitAndPublishTimeMs向Fe请求提交并且发布事务所花费的时间,单位:毫秒。
ErrorURL如果有数据质量问题,通过访问这个 URL 查看具体错误行。

示例

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" http://host:port/api/test_db/test_table/_stream_load

取消导入

用户无法手动取消 Stream Load,Stream Load 在超时或者导入错误后会被系统自动取消。

查看 Stream Load

您可以通过show stream load来查看已经完成的Stream load任务。默认BE(BackEnd)不保留Stream Load的启用记录,如果您要查看则需要在BE上启用记录,配置参数为:enable_stream_load_record=true,具体操作请参见BE配置项 (opens in a new tab)。​

使用示例

  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,使用 Label 用于去重,指定超时时间为 100 秒。

     curl --location-trusted -u root -H "label:123" -H "timeout:100" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,使用 label 用于去重,并且只导入 k1 等于 20180601 的数据。

    curl --location-trusted -u root -H "label:123" -H "where: k1=20180601" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,允许 20% 的错误率(用户是 defalut_cluster 中的)。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表,允许 20% 的错误率,并且指定文件的列名。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将本地文件 test.data 中的数据导入到数据库 test_db 中的 test_table 表中的 p1、p2 分区,允许 20% 的错误率。

    curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 使用流式方式(streaming)导入。

    seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T - http://host:port/api/test_db/test_table/_stream_load
  • 导入含有 HLL 列的表,可以是表中的列或者数据中的列用于生成 HLL 列,也可使用 hll_empty 补充数据中没有的列。

    curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 导入数据进行严格模式过滤,并设置时区为 Africa/Abidjan。

    curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 删除与这批导入 key 相同的数据。

    curl --location-trusted -u root -H "merge_type: DELETE" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 将这批数据中与 flag 列为 1 的数据相匹配的列删除,其他行正常追加。

    curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T test.data http://host:port/api/test_db/test_table/_stream_load
  • 导入数据到含有 sequence 列的 UNIQUE_KEYS 表中。

    curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T test.data http://host:port/api/test_db/test_table/_stream_load

相关系统配置

FE 配置

stream_load_default_timeout_second:导入任务的超时时间,单位:秒。默认的 timeout 时间为 600 秒,导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。如果导入的源文件无法在规定时间内完成导入,您可以在 Stream Load 请求中设置单独的超时时间或者调整 FE 的参数 stream_load_default_timeout_second 来设置全局的默认超时时间。

BE 配置

streaming_load_max_mb:Stream Load 的最大导入大小,单位:MB,默认值为 10240 MB。如果您的原始文件超过该值,则需要调整 BE 参数 streaming_load_max_mb

© 2023 北京飞轮数据科技有限公司 京ICP备2022004029号 | Apache、Apache Doris 以及相关开源项目名称均为 Apache 基金会商标