找回密码
 立即注册
查看: 201|回复: 0

Flink CDC 在京东的探索与实践

[复制链接]
发表于 2024-7-15 18:44 | 显示全部楼层 |阅读模式
摘要:本文整理自京东资深技术专家韩飞,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部门:

1. 京东自研 CDC 介绍
2. 京东场景的 Flink CDC 优化
3. 业务案例
4. 未来规划

点击查看直播回放和演讲 PPT

一、京东自研 CDC 介绍




京东自研 CDC 代号为 Fregata,是我们针对数据实时采集和分发场景自研的底层框架。Fregata 是一种动物,叫做军舰鸟,它是世界上飞翔速度最快的鸟,即使在恶劣天气下也能保持很好的飞翔能力及机动性,寓意我们整个实时采集、分发处事的高效不变。

目前,Fregata 是京东集团数据中台实时采集和分发的统一入口,处事京东零售、物流、科技、健康和工业等 BGBU,覆盖订单交易、商智黄金眼、实时风控、京东白条、实时大屏等核心业务。




目前 Fregata 线上不变运行任务超过两万,大促措置条数峰值为 64.1 亿条/min,这个是采集和分发的总数据条数,对应的传输数据量峰值为 8.3TB/min。

针对单数据库实例的采集能力超过 500w 条/min,远超数据库主从同步的速率。




Fregata 任务目前总计使用 CPU 资源超过 6 万核,内存使用超过 18wGB。

我们基于京东 JDOS 平台实现了 Fregata 任务的容器化部署和运行,而且撑持任务的跨机房部署,目前任务主要分布汇天和廊坊两个机房,两个机房互为主备。

容灾方面,撑持任务的一键容灾切换,如果呈现机房大面积故障或者断网等情况,可以快速将任务切换到备机房,从而保证任务的快速恢复和不变运行。




上图左侧主要展示了 Fregata 的整体架构。

首先,Fregata 按照功能分为实时采集和实时分发两部门,实时采集基于数据库主从复制道理,实时捕捉 Binlog 数据进行解析并按照必然的格式进行封装,然后发送到京东自研动静队列 JDQ 中,供下游业务实时消费,目前撑持的源端数据库类型有物理 MySQL,京东自研弹性数据库 JED、京东云 RDS、京东数科 CDS 及 Oracle,此中 Oracle 是通过 Logminer 来实现对数据库日志的实时采集。

实时分发部门,主要是将 JDQ 中多种格式的数据实时同步到分歧的方针存储中,目前撑持的动静格式有 CSV/JSON/ProtoBuf/Xml/Avro 等, 目前撑持的方针存储有 HDFS 或者 Hive(对应离线数仓),OLAP 分析引擎包罗 Doris 和 ClickHouse,动静队列 JDQ,ElasticSearch 及数据湖存储 Iceberg。撑持的数据源端、方针端城市按照实际需求不竭进行丰硕。

Fregata 做采集和分发的拆分这样的设计主要是基于“一次采集、多次分发”的思路,这样的好处是既可以减少对上游数据库的负载,又可以满足业务多次消费、多种分歧类型消费、以及短期内数据回放的需求,这里 JDQ 数据一般保留 7 天。

上图右侧主要展示了 Fregarat 引擎的设计框架,整个引擎主要分为三层,分袂是 Source、Parse、Sink 算子,每层算子之间通过 RingBuffer 进行链接(我们选用的 disruptor)。


  • Source 算子按照数据源类型的分歧实现源端数据的拉取并推到 RingBuffer 中。
  • Parse 算子从 RingBuffer 中拉取数据,对数据进行解析组装和一些 ETL 加工,然后将数据发送到下游的 RingBuffer 中。
  • Sink 算子拉取下游 RingBuffer 中的数据并按照方针数据源的要求进行必然的数据格式的组装,然后发送到分歧的方针存储上。
此外,还有一个 BarrierService 按时发生 Barrier,整个任务通过 Barrier 处事来完成状态的提交和记录,其道理跟 Flink 中 Checkponit 机制类似。BarrierService 按时发生 Barrier 并传递给 Source 算子,Source 算子在拿到 Barrier 之后以广播的形式传递给下游的 Parse,下游的 Parse 拿到 Barrier 之后再以广播的形式传递给所有的 Sink 算子,当每个 Sink 算子收到所有 Barrier 之后会向 BarrierService 进行 ack 操作,这时 BarrierService 会进行一系列的状态提交,例如提交消费位点、记录的 Binlog 位置等。




我们接着看 Fregata 的技术特性,首先是关于 Binlog 的位点追踪。

上图右侧主要介绍了实时采集任务启动运行的整个流程。此中位点处事中记录任务上次已经消费的 Binlog 位点信息,主要包罗 Binlog 文件名称,该 Binlog 文件已经消费到的位置,数据库实例的 serverid,该 Binlog 位置对应的事务发生时间以及 GTID 信息。

采集任务启动时会向位点处事获取上次记录的 Binlog 位点信息,然后将记录的 BinlogPosition 或者 GTID 信息传递给 Binlog Connector,Binlog Connector 按照 BinlogPostion 或者 GTID 信息生成 dump 命令并发送给数据库实例,然后数据库实例将 Binlog 日志推送给 Binlog Connector,Binlog Connector 将接受的 Binlog 日志进行反序列化并封装成 Binlog Event 传递给 Fregata,Fregata 对 Binlog Event 进行相关措置后发送给 JDQ。

由于 MySQL 在 5.6.5 版本之后才有 GTID,而且京东线上业务库存在数据库版本较低的现象,因此 Fregata 对 BinlogPosition 和 GTID 两种方式都进行了撑持,而且撑持从指按时间点、最新位点、起始位点以及指定 Binlog 位置,多种消费模式灵活配置。

此外,当上游数据库版本升级至高版本并开启了 GTID 后,就存在采集任务需要从 BinlogPosition 模式切换成 GTID 模式的场景,所以 Fregata 也撑持了任务的位点模式在 BinlogPosition 和 GTID 之间自动切换的功能,而且在切换的过程中保证数据不丢不重。

切换过程如上图中左下角所示,首先任务从 BinlogPosition 模式重启,然后查询并缓存在这个重启过程中已经执行的 GTID 事务。接着任务会先以 BinlogPosition 模式继续措置 Binlog 中的 GTID EVENT,并判断前边缓存的 GTID 中是否包含当前已消费的 GTID,如果不包含,则说明消费进度已经追上,此时任务将位点记录模式直接切换成 GTID 模式。




接着介绍 Fregata 动态感知相关的功能。Fregata 实时采集任务配置是数据库域名,如果线上数据库存在故障或者要下线,则会有数据库实例需要发生变换的场景,Fregata 是可以感知到变换并自动进行切换的。

由于切换前后两个数据库实例 Binlog 文件一般都是纷歧致的,如果此时任务位点记录方式是 BinlogPosition 模式,则在切换之后任务需要自动进行 Binlog 对齐操作,进而保证数据的完整性。(GTID 模式是不需要考虑这个问题的)

整个切换过程如上图右侧所示,BinlogPosition 模式下,任务会查询出新数据实例上全部的 Binlog 文件,并按照倒序对 Binlog 文件进行遍历,然后按照位点处事中记录的时间戳查询出对应的 Position,然后任务从查询出的该 Position 继续消费。这种倒序查找的方式主要是针对线上切库的场景,这种情况下采用倒叙的查询效率斗劲高,一般查找 1-2 分钟前的 Binlog 即可。

Fregata 动态感知能力还表此刻 DDL 变换的感知上,Fregata 能够识别数据库中的 DDL 操作并自动进行适配,目前撑持的 DDL 变换类型包罗,比如新增、删减字段,改削字段类型、字段挨次调整等。

由于下游业务方也会存眷数据库的 DDL 操作,因此 Fregata 在识别到 DDL 操作时,还会自动以邮件或者语音的方式通知打点员及用户进行存眷。




Fregata 也具备一些数据加工及丰硕的能力。

Fregata 在采集 Binlog 的过程中,会对每一笔记录增加一个独一的版本号 Mid(也就是 message id),下游用户可以按照这个版本号进行去重或者确定最新的变换记录,比如当将增量数据分发到 Hive 或者其他无主键约束的存储中时,用户可以按照 Mid 来确定对于同一个主键的多条操作记录,哪条是最新的变换操作。

此外,Fregata 还会将数据库、表及数据库实例等信息作为元数据封装到每条动静体中,便利下游有相关需求的业务用于判断数据的来源。

在数据加工方面,采集过程中还撑持使用多种函数对数据进行加工措置,如敏感字段加解密、类型转换、时间转换等。

在部署方面,如果上游业务库是分库分表模式并覆盖多个实例,Fregata 将会按照数据库实例个数启动多个采集任务,采集任务和数据库实例一一对应。

这样的好处是任务彼此独立而且资源隔离,单一数据库实例的变换不影响其他数据库实例的采集任务,劣势是如果实例数量较多,配置和维护成本会略高; 配置方面,我们通过产物化流程解决这个问题,实现一次配置。




告警方面,Fregata 撑持任务存活告警,在任务存活异常的情况下,运维人员会收到语音或者邮件报警信息。同时,采集任务会按分钟粒度上报采集延迟、数据库主从延迟和抽取零值的这些监控指标信息,供用户不雅观测任务运行情况。

全增量数据撑持方面,Fregata 目前只撑持增量数据的抽取,全量数据的抽取依赖 Binlog 保留时间。

换句话说,如果 Binlog 数据全量保留,则可以抽取全部数据,否则,只能抽取保留的 Binlog 数据,其他更早的历史数据需要离线抽取来抵偿。

二、京东场景的 Flink CDC 优化

上边是关于 Fregata 的内容,整体来讲,目前我们对于 Flink CDC 的使用还处在一个多方面验证和相对初级的阶段。针对京东内部的场景,我们在 Flink CDC 中适当补充了一些特性来满足我们的实际需求。所以接下来一起看下京东场景下的 Flink CDC 优化。




在实践中,会有业务方提出但愿按照指按时间来进行历史数据的回溯,这是一类需求;还有一种场景是当本来的 Binlog 文件被全部清理,这时需要重置到新发生的 Binlog 文件上。

针对上述场景,我们通过复用 scan.startup.mode 参数,扩展 earliest-offset\timestamp\specific-offset 三种 Binlog 阶段的启动模式。

此中 specific-offset 模式下,需要设置 scan.startup.specific-offset.file 参数指定 Binlog 文件名称、scan.startup.specific-offset.pos 指定该文件的某一个位置,按照这两个参数来确定增量阶段要消费的起始位置;earliest-offset 模式下默认会读取最早的 Binlog 文件;timestamp 模式,需要设置一个时间参数 scan.startup.timestamp-millis。

如上图右侧所示,在 timestamp 启动模式下,会按照用户指定的时间按照倒序的方式去查找相应的 Binlog 文件以及 Position,最终底层模式完全复用 specific-offset 的方式。

不管使用哪种模式,最终城市按照分歧的启动模式构建正确的 Start Binlog Offset,并进一步构建 MySQLBinlogSplit。




在低版本 MySQL 的出产中,会存在数据库实例下线,或者从库存在显著的主从延迟(需迁移至其他从库);在这两种场景下,一般会进行切库操作。如何实现自动切库呢?或者说如何实此刻低版本 MySQL 的 Binlogposition 模式下的自动切库呢?

如上图右侧所示,我们增加了一步 切库查抄的操作:

首先,在 MySQLBinlogsplit 中增加了对 MySQL 层面的 serverid 信息的保留,并改削了 state 保留&恢复过程中对 MySQLSplitBinlog 对象的措置逻辑。

然后,查询 MySQL 实例获取 serveid,并与 MySQLBinlogsplit 对象中存储的 serverid 进行对比。

如果纷歧致, 则认为发生切库操作,此时需要按照 Binlogoffset 保留的消费位点的时间信息,也就是 timestamp,在新库中倒序查找并从头构建 start Binlogoffset 以及进一步构建 MySQLBinlogsplit。




当前 Flink MySQL CDC 撑持采集时延、发送时延、空闲时长的监控指标,在实际出产中,用户反馈有需要存眷上游数据库主从延迟的需求。同时,所有监控指标都存在可视化及异常报警需求。

基于上述情况,首先我们新增了数据库主从延迟的监控指标,并将所有这些监控指标对接到监控系统 Byzer。如上图所示,整体流程是这样,Flink JobManager 和 TaskManager 启动时会携带 agent,会通过 agent 将监控数据发送到 Byzer 系统。

用户可以在 JRC 平台(实时计算平台)配置监控报警法则,这些法则会被同步到 Byzer 系统。另一方面,JRC 平台会拉取 Byzer 监控系统数据并进行可视化展示。




最后来看一个偏应用层面的改造,在实际的业务中大量存在分库分表的场景,而且线上分库分表基本会分布在多个 MySQL 实例中。

社区版本 Flink MySQL CDC 如果要在一个作业中撑持多实例,需要用户多次复制 DDL 定义语句并改削 hostname 配置,如果实例数量多的话是斗劲影响用户体验及 SQL 的可读性。

对此,我们结合平台实现了多实例的撑持。通过 calcite 解析用户的 SQL 语句,找到 MySQL-cdc 的 DDL 定义,并解析此中 hostname 字段来判断是否包含多实例,也就是配置了多个 host。如果包含多个实例,则自动按实例分割,创建分歧实例对应的表,最后再 union 为一个视图。如图中蓝色卷轴示例所示,此时只需要做一次 DDL 的定义。

此外,在采集多实例,写带 Primary Key 的 Sink 场景中,我们做了一个优化。由于 Flink MySQL CDC 进入 Binlog 阶段后只会在 Source 算子的第一个 subtask 中执行任务,而 Primary Key Sink 会触发 Flink 引擎优化 Sink 算子增加 NotNullEnforcer 算子来查抄数据相关的 not null 的字段,然后再进行 hash 分发到 SinkMaterializer 算子以及后面的 Sink 算子。

由于 Source 与 NotNullEnforcer 之间是 forward 关系,因此 NotNullEnforcer 也只有一个 task 措置数据,这在 Source 较多的场景下措置性能可能是不够的。

为充实操作 NotNullEnforcer 算子的并行度,我们增加了 table.exec.sink.not-null-enforcer.hash 参数,然后在 commonExecSink 中增加 通过该参数来判断是否要加速 NotNullEnforcer 算子 这样的逻辑。如果开启加速,则提前使用 Primary Key 进行 hash,然后再分发到 NotNullEnforcer 算子,从而实现对 NotNullEnforcer 算子的优化。




来看下优化前后的对比。

第一个图中可以看到,如红框所示,NotNullEnforcer 算子中只有第一个 Task 在措置数据。

优化后,在第二个图中,可以看到 NotNullEnforcer 算子的所有 10 个并行度都被操作了起来,而且 Source 算子和 NotNullEnforcer 算子之间是 hash 关系。

三、业务案例

在这个案例中,我们结合 Flink CDC、Flink 核心计算能力以及数据湖 Hudi,对我们平台的一个业务方,京东物流的一个业务数据系统进行了技术架构的试点改造。




这个系统是物流运营数据中心 LDC 中的中小件实时运营监控系统。它在整个京东物流内部被高频使用,非论是打点者用于决策,还是一耳目员用于精细化进度打点。

它覆盖物流的三大核心操作环节,揽收、分拣、配送, 并在分歧的维度进行下钻,来提供物流各环节操作单量的监控以及可视化。




上游是弹性数据库 JED,分库分表而且分布在多个实例上。

在上边的离线链路中,首先通过 plumber 将数据抽取到离线数仓的 BDM 层,plumber 是京东离线异构数据交换的基础处事,负责将分歧数据源的数据抽取至数仓或者将数仓计算成果推送到分歧的数据源中。

在数据抽取到 BDM 层后,数据会颠末 FDM 层的拉链以及后边几层的数据加工,最后业务数据的成果汇总至 APP 层,再通过 plumber 将成果推送至 ES 中,LDC 的用户使用的产物底层查询 ES。还存在此外一种方式,OLAP 引擎 StarRocks 会导入 app 层的数据,然后供用户查询。

下边实时链路中,Fregata 采集数据库 Binlog 发送至 JDQ,Flink 消费 JDQ 数据继续写入 JDQ,以此往复,对应于离线数仓的分层逻辑,构建了基于 JDQ 的实时数仓, 最终的成果通过一个叫 syncer 的同步东西,将数据从 JDQ 同步到 ES 和 StarRocks 中。

同时,还存在另一条链路,最上游的 JDQ 通过 Fregata 直接分发数据到离线的 BDM 层,构建准实时的 BDM 表。整体来看,属于典型的 Lambda 数据架构。

当前的架构存在几个痛点:


  • 离线链路存在 SLA 撞线的问题,当上游链路计算资源拥挤或者呈现异常重试的情况时数据的时效性有可能不如按时达成。
  • ES 处事器的存储成本斗劲高,一年在 100 万摆布。
  • 典型 Lambda 架构的一些问题,由于流批割裂导致的处事器资源无法复用,技术栈分歧,开发效率低,数据口径纷歧致等问题。



由于这个业务的实时数据接受端到端分钟级此外时延,因此对这个数据架构做了些改造。

首先基于我们改造后的 Flink CDC 能力, 实现了一个 Flink 作业,对上游多实例的 JED 分库分表数据,进行全增量一体化采集。

在数据加工层面,结合 FlinkSQL,为用户提供了低代码的开发方式,也就是拖拽+SQL,计算的成果写入数据湖 Hudi。

然后再基于 Hudi 的增量读取能力,进一步加工,完成 FDM、GDM、APP 等分歧层的加工逻辑,成果通过 StarRocks 挂载 Hudi 外部表 ,提供给终端 LDC 用户查询。通过这样的改造,最终构建了一条端到端准实时的数据链路。

总结:首先,结合 Flink CDC、Flink 核心计算能力及 Hudi 初度实现端到端流批一体。可以看到,覆盖采集、存储、计算三个环节。最终这个链路是端到端分钟级别数据时延(2-3min),数据时效的提升有效驱动了新的业务价值,例如对于物流履约达成以及用户体验的提升。数据时效成本方面,解决离线撞线问题,一条准实时链路,不存在离线撞线;Hudi+StarRocks 的组合成底细较于 ES 显著降低(经评估,约为本来的 1/3)。相较于 Lambda 架构,在处事器成本、开发效率及数据质量方面都有显著的提升。

四、未来规划




未来规划包含以下几个方面:


  • 测验考试实现任务不竭止的 Schema Evolution。例如针对 Hudi、针对 JDQ。
  • 继续基于京东场景的 Flink CDC 改造。比如数据加密、全面对接实时计算平台 JRC 等。
  • 测验考试将部门 Fregata 出产任务切换 Flink CDC。好处是技术栈统一,符合整体技术收敛的趋势。
  • 结合流批一体的存储来提升端到端的整体时效性。 例如结合 Table Store 去测验考试实现端到端更低的,例如秒级此外时延。

点击查看直播回放和演讲 PPT


<hr/>更多内容



<hr/>活动保举
阿里云基于 Apache Flink 构建的企业级产物-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc


本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-11 05:43 , Processed in 0.100453 second(s), 27 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表