返回
技术分享

从 Flink 到 Doris 的实时数据写入实践——基于 Flink CDC 构建更实时、高效的数据集成链路

SelectDB 技术团队· 2025/08/01

Flink-Doris-Connector 作为 Apache Flink 与 Doris 之间的桥梁,打通了实时数据同步、维表关联与高效写入的关键链路。本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。

一、Apache Doris 简介

Apache Doris 是一款基于 MPP 架构的高性能、实时的分析型数据库,整体架构精简,只有 FE 、BE 两个系统模块。其中 FE 主要负责接入请求、查询解析、元数据管理和任务调度,BE 主要负责查询执行和数据存储。Apache Doris 支持标准 SQL 并且完全兼容 MySQL 协议,可以通过各类支持 MySQL 协议的客户端工具和 BI 软件访问存储在 Apache Doris 中的数据库。

在典型的数据集成和处理链路中,往往会对 TP 数据库、用户行为日志、时序性数据以及本地文件等数据源进行采集,经由数据集成工具或者 ETL 工具处理后写入至实时数仓 Apache Doris 中,并由 Doris 对下游数据应用提供查询和分析,例如典型的 BI 报表分析、OLAP 多维分析、Ad-hoc 即席查询以及日志检索分析等多种数据应用场景。

Apache Doris 简介 .PNG

Flink-Doris-Connector 是 Apache Doris 与 Apache Flink 在实时数据处理 ETL 的结合,依托 Flink 提供的实时计算能力,构建高效的数据处理和分析链路。Flink-Doris-Connector 的使用场景主要分为三种:

  1. Scan:通常用来做数据同步或是跟其他数据源的联合分析;
  2. Lookup Join:将实时流中的数据和 Doris 中的维度表进行 Join;
  3. Real-time ETL:使用 Flink 清洗数据再实时写入 Doris 中。

Apache Doris 简介 -2.PNG

本章节结合 Scan、Lookup Join、Write 这三种场景,介绍 Flink-Doris-Connector 的设计与实现。

01 Scan 场景

Scan 场景指将 Doris 中的存量数据快速提取出来,当从 Doris 中读取大量数据时,使用传统的 JDBC 方法可能会面临性能瓶颈。因此 Flink-Doris-Connector 中可以借助 Doris Source ,充分利用 Doris 的分布式架构和 Flink 的并行处理能力,从而实现了更高效的数据同步。

Doris Source 读取流程

  • Job Manager 向 FE 端发起请求查询计划,FE 会返回要查询的数据对应的 BE 以及 Tablet;
  • 根据不同的 BE,将请求分发给不同的 TaskManager;
  • 通过 Task Manager 直接读取每个 BE 上对应 Tablet 的数据。

通过这种方式,我们可以利用 Flink 分布式处理的能力从而提高整个数据同步的效率。

Doris Source 读取流程.PNG

02 Lookup Join 场景

Lookup Join 场景.PNG

对于维度表存储在 Doris 中的场景,可通过 Lookup Join 实现对实时流数据与 Doris 维度表的关联查询。

JDBC Connector

Doris 支持 MySQL 协议,所以可以直接使用 JDBC Connector 进行 Lookup Join,但是这一方式存在一定的局限:

  • Jdbc Connector 中的 Lookup Join 是同步查询的操作,会导致实时流中每条数据都要等待 Doris 查询的结果,增加了延迟。
  • 仅支持单条数据查询,在上游数据量吞吐较高时,容易造成性能瓶颈和反压。

因此针对 Lookup Join 场景 ,Flink-Doris-Connector 实现了异步 Lookup Join 和攒批查询的优化:

  • 支持异步 Lookup Join: 异步 Lookup Join 意味着实时流中的数据不需要显式等待每条记录的查询结果,可以大大的降低延迟性。
  • 支持攒批查询: 将实时流的数据追加到队列 Queue 中,后台通过监听线程 Watcher,将队列里面的数据取出来再推送到查询执行的 Worker 线程池中,Worker 线程会将收到的这一批数据拼接成一个 Union All 的查询,同时向 Doris 发起 Query 查询。

通过异步 Lookup join 以及攒批查询,可以在上游数据量比较大的时候大幅度提高维表关联吞吐量,保障了数据读取与处理的高效性。

03 实时 ETL 场景

实时 ETL 场景.png

对于实时写入来说,Doris Sink 的写入是基于 Stream Load 的导入方式去实现的。Stream Load 是 Apache Doris 中最为常见的数据导入方式之一,支持通过 HTTP 协议将本地文件或数据流导入到 Doris 中。主要流程如下:

  • Sink 端在接收到数据后会开启一个 Stream Load 的长链接请求。在 Checkpoint 期间,它会将接收到的数据以 Chunk 的形式持续发送到 Doris 中。
  • Checkpoint 时,会对刚才发起的 Stream Load 的请求进行提交,提交完成后,数据才会可见。

如何保证数据写入的 Exactly-Once 语义

如何保证数据写入的 Exactly-Once 语义 .png

那么,如何保证数据写入期间,端到端数据的精确一次性?

以 Kafka 同步到 Drois 的 Checkpoint 过程为例:

  1. Checkpoint 时,Source 端会接收到 Checkpoint Barrier;
  2. Source 端接收到 Barrier 后,首先会对自身做一个快照,同时会将 Checkpoint Barrier 下发到 Sink 端;
  3. Sink 端接收到 Barrier 后,执行 Pre-commit 提交,成功后数据就会完整写入到 Doris,由于此处执行的是预提交,所以在 Doris 上,此时对用户来说数据是不可见的;
  4. 将 Pre-Commit 成功的事务 ID 保存到状态中;
  5. 所有的算子 Checkpoint 都做完后,Job Manager 会下发本次 Checkpoint 完成的通知;
  6. Sink 端会对刚才 Pre-commit 成功的事务进行一次提交。

通过这种两阶段提交,就可以实现端到端的精确一次性。

实时性与 Exactly-Once

实时性与 Exactly-Once.png

上面提到,Doris Sink 端的写入与 Checkpoint 绑定,数据写入 Doris 的延迟性取决于 Checkpoint 的间隔。但在一些用户的场景下,希望数据可以实时写入,但是 Checkpoint 不能做的太频繁,同时对于一些作业来说,如果 Checkpoint 太频繁会消耗大量资源,针对该情况,Flink-Doris-Connector 引入了攒批机制,以平衡实时性与资源消耗之间的矛盾。

攒批的实现原理是 Sink 端接收上游数据之后,不会立即将每条数据单独写入 Doris,而是先在内存中进行缓存,然后通过对应参数设置,将缓存数据提交到 Doris 中。结合攒批写入和 Doris 中的主键模型,可以确保数据写入的幂等性。

通过引入攒批机制,既满足了用户对数据实时写入的需求,又避免了频繁 Checkpoint 带来的资源消耗问题,从而实现性能与效率的优化。

以上是对 Flink-Doris-Connector 的典型场景和实现原理介绍,接下来我们来看它在实际业务中的一个重要应用——整库同步。相比底层实现,整库同步更偏向具体使用场景。下面我们基于前面介绍的能力,进一步探讨如何通过 Flink CDC 实现 TP 数据库到 Doris 的高效、自动化同步。

01 整库同步痛点

在数据迁移过程中,用户通常希望可以尽快将数据迁移到 Doris 中,然而在同步 TP 数据库时,整库同步往往面临以下几点挑战:

  • 建表:
    • 存量表的快速批量创建:TP 数据库中往往存在成千上万的表,这些表的结构各异,对于存量表而言需要逐一在 Doris 中创建对应的表结构;
    • 同步任务开启后,新增表的自动创建与同步: 为了保证数据的完整性和实时性,同步工具需要实时监控 TP 数据库的变化,并自动在 Doris 中创建和同步新表。
  • 元数据映射: 上下游之间字段元数据的便捷映射,包括字段类型的转换、字段名称的对应修改等。
  • DDL 自动同步: 增加、删除列等操作会导致数据库结构发生变化,进而影响到数据同步。因此,同步工具需要能够实时捕获 DDL 并动态地更新 Doris 表结构,以确保数据的准确性和一致性。
  • 开箱即用: 零代码,低门槛,理想的同步工具只需进行简单配置,即可实现数据的迁移和同步。

在数据抽取方面,Flink-Doris-Connector 借用了 Flink CDC 的特性能力:

  • 增量快照读取
    • 无锁读取与并发读取:不论存量数据量多大,都可以通过横向提高 Flink 的并发提升数据读取速度。
    • 断点续传:当存量数据比较大时,可能面临同步中断的情况,CDC 支持中断任务的衔接同步。
  • 丰富数据源支持,Flink CDC 支持多种数据库,如 MySQL、Oracle、SQLServer 等。
  • 无缝对接 Flink 现有生态,方便与 Flink 已有Source 和 Sink 结合使用。

基于 Flink CDC 实现整库同步.png

一键建表与元数据自动映射

Flink-Doris-Connector 中集成了 Flink CDC 等能力,可以让用户只提交一个操作,就能进行整库同步的操作。其主要原理是 Flink CDC Source 在接收到上游的数据源之后,会进行分流处理,不同的表用不同的 Sink。同时在最新的 Connector 版本中,也支持单个 Sink 同步多张表,支持新增表的创建和同步。

集成 Flink CDC 的功能后,用户仅需通过 Flink-Doris-Connector 提交任务,就可以在 Doris 自动创建所需的表,而无需配置上下游表之间的显式关联,实现数据快速同步

当 Flink 任务启动后,Doris-Flink-Connector 将自动识别对应的 Doris 表是否存在。如果表不存在,Doris Flink Connector 会自动创建表,并根据 Table 名称进行分流,从而实现下游多个表的 Sink 接入;如果表存在,则直接启动同步任务。

这一改进,不仅简化了配置流程,还使得新增表的创建和同步更加便捷,从而提升数据处理的整体效率。

Light Schema Change 与 DDL 自动同步

Light Schema Change 与 DDL 自动同步.png

在 Apache Doris 1.2 版本之前,Schema Change 操作比较繁琐,需要手动增改数据列。在上游 TP 数据库发生表结构变更时,需要暂停数据同步任务、待 Doris 中的 Schema Change 完成后再重启任务。

自 Apache Doris 1.2 版本起,我们引入了轻量级的 Light Schema Change 机制,极大地简化了操作流程,常见的增减列场景其处理速度可达毫秒级。Light Schema Change 机制原理如下:

  • Schema Change:
    • 客户端向 FE 发起增减列的请求;
    • FE 在接收到请求后,修改当前元数据,并将最新的 Schema 持久化;
    • FE 向客户端同步 Schema Change 的结果;
  • Data Load:
    • 当后续导入任务发起时,FE 将导入任务与最新的 Schema 信息发送给 BE;
    • 在数据写入过程中,BE 的每个 Rowset 都会存储当前导入的 Schema 信息;
  • Query:
    • FE 将查询计划与最新的 Schema 一起发送给 BE;
    • BE 使用最新 Schema 执行查询计划;
  • Compaction:
    • 在 BE 中,对参与合并的 Rowset 版本进行比较;
    • 根据最新的 Schema Change 信息进行数据合并。

经测试,与早期的 Schema Change 相比,Light Schema Change 的数据同步性能有了数百倍的提升,

Light Schema Change 与 DDL 自动同步-2.png

Light Schema Change 与 Flink-Doris-Connector 的结合,通过 Flink CDC 可以实现 DDL 的自动同步,具体步骤如下:

  1. Source 端捕获上游 Schema Change 信息,开启 DDL 变更同步;
  2. Doris Sink 端识别并解析 DDL 操作(加减列);
  3. Table 校验,判断是否可以进行 Light Schema Change;
  4. 发起 Schema Change 操作;

基于这一实现,Doris 能自动获取到 DDL 语句并在毫秒级即可完成 Schema Change 操作,在上游 TP 数据库发生表结构变更时,数据同步任务无需暂停。

开箱即用:MySQL 整库同步示例

开箱即用:MySQL 整库同步示例.png

对于用户来讲,只要有 Flink 客户端,通过上图的操作就可以提交整库同步作业。支持传入 Flink 的配置,比如并发设置、Checkpoint 间隔等,也支持正则表达式去配置需要同步的表, 同时可以将 Flink CDC Source 和 Doris Sink 的配置直接透传给具体的 Connector。通过这种方式,用户可以很便捷地提交整库同步作业。

基于以上优化,可以完美解决用户的痛点:

  1. 自动建表,即存量表与增量表的自动创建,无需用户提前在 Doris 中预先创建对应的表结构;
  2. 自动映射上下游字段,无需手动写入上下游字段间的匹配规则,节省大量人力成本;
  3. 增减列无感同步,及时获取上游 DDL 语句并自动在 Doris 中实现毫秒级 Schema Change,无需停服、数据同步任务平稳运行;
  4. 开箱即用,降低学习成本,更专注业务本身。

04 最佳实践

在生产环境中,若作业数量较多,直接采用上述提交方式的作业管理复杂度较高。通常建议借助任务托管平台(如 StreamPark),实现对作业的统一创建、监控与运维,从而提升任务管理效率与系统稳定性。

最佳实践.png

最佳实践-2.png

四、未来规划

未来,基于 Flink-Doris-Connector 的能力规划如下:

  1. 支持实时读取。目前 Doris Source 只是把数据 Scan 出来,是一个有界流的读取,后续会支持 CDC 的场景,可以使用 Flink 来对 Doris 中的数据进行流式的读取。
  2. Sink 一流多表。目前Flink-Doris-Connector支持单个 Sink 同步多张表,但是 Stream Load 的导入方式还是只支持单个表的导入。所以在表特别多的时候,需要在 Sink 端维护大量 StreamLoad 的连接,在后续会做到单个 Stream Load 的连接支持多张表的写入。
  3. 整库同步方面,支持更多的上游数据源,满足更多数据同步场景。