华为云MRSCDL架构设计与实现
1 前⾔
MRS CDL是华为云FusionInsight MRS推出的⼀种数据实时同步服务,旨在将传统OLTP数据库中的事件信息捕捉并实时推送到⼤数据产品中去,本⽂档会详细为⼤家介绍CDL的整体架构以及关键技术。
2 CDL的概念
MRS CDL(Change Data Loader)是⼀款基于Kafka Connect的CDC数据同步服务,可以从多种OLTP数据源捕获数据,如Oracle、MySQL、PostgreSQL等,然后传输给⽬标存储,该⽬标存储可以⼤数据存储如HDFS,OBS,也可以是实时数据湖Hudi等。
2.1 什么是CDC?
CDC(Change Data Capture)是⼀种通过监测数据变更(新增、修改、删除等)⽽对变更的数据进⾏进⼀步处理的⼀种设计模式,通常应⽤在数据仓库以及和数据库密切相关的⼀些应⽤上,⽐如数据同步、备份、审计、ETL等。
CDC技术的诞⽣已经有些年头了,⼆⼗多年前,CDC技术就已经⽤来捕获应⽤数据的变更。CDC技术能够及时有效的将消息同步到对应的数仓中,并且⼏乎对当前的⽣产应⽤不产⽣影响。如今,⼤数据应⽤越来越普遍,CDC这项古⽼的技术重新焕发了⽣机,对接⼤数据场景已经是CDC技术的新使命。
当前业界已经有许多成熟的CDC to⼤数据的产品,如:Oracle GoldenGate(for Kafka)、 Ali/Canal、Linkedin/Databus、
Debezium/Debezium等等。
2.2 CDL⽀持的场景
MRS CDL吸收了以上成熟产品的成功经验,采⽤Oracle LogMinner和开源的Debezium来进⾏CDC事件的捕捉,借助Kafka和Kafka Connect的⾼并发,⾼吞吐量,⾼可靠框架进⾏任务的部署。
现有的CDC产品在对接⼤数据场景时,基本都会选择将数据同步到消息队列Kafka中。MRS CDL在此基础上进⼀步提供了数据直接⼊湖的能⼒,可以直接对接MRS HDFS和Huawei OBS以及MRS Hudi、ClickHouse等,解决数据的最后⼀公⾥问题。
场景数据源⽬标存储
实时数据湖分析Oracle Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive
实时数据湖分析MySQL Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive
实时数据湖分析PostgreSQL Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive
表1 MRS CDL⽀持的场景
3 CDL的架构
作为⼀个CDC系统,能够从源⽬标抽取数据并且传输到⽬标存储中去是基本能⼒,在此基础上,灵活、⾼性能、⾼可靠、可扩展、可重⼊、安全是MRS CDL着重考虑的⽅向,因此,CDL的核⼼设计原则如下:
系统结构必须满⾜可扩展性原则,⽀持在不损害现有系统功能的前提下添加新的源和⽬标数据存储。
架构设计应当满⾜不同⾓⾊间的业务侧重点分离正则匹配一张图片
在合理的情况下减少复杂性和依赖性,最⼤限度的降低架构、安全性、韧性⽅⾯的风险。
需要满⾜插件式的客户需求,提供通⽤的插件能⼒,使得系统灵活、易⽤、可配置。
业务安全,避免横向越权和信息泄露。
3.1 架构图/⾓⾊介绍
图1 CDL架构
MRS CDL包含CDL Service和CDL Connector两个⾓⾊,他们各⾃的职能如下:
CDL Service:负责任务的管理和调度,提供统⼀的API接⼝,同时监测整个CDL服务的健康状态。
CDL Connector:本质上是Kafka Connect的Worker进程,负责真实Task的运⾏,在Kafka Connect⾼可靠、⾼可⽤、可扩展的特性基础上增加了⼼跳机制来协助CDL Service完成集的健康监测。
3.2 为什么选择Kafka?
我们将Apache Kafka与Flume和Nifi等各种其他选项进⾏了⽐较,如下表所⽰:
|Flume|Nifi|Kafka
:-- :-- :-- :--:
优点|基于配置的Agent架构;;Source、Channel、Sink模型| 有许多开箱即⽤的处理器;背压机制;处理任意⼤⼩的消息;⽀持MiNifi Agent来收集数据;⽀持边缘层数据流|可扩展、分布式、⾼容错、⾼吞吐量的消息传递系统;背压机制;⽆数据丢失;Kafka Connect ⽀持Source、Sink模型;超过50种可⽤的Connector;消息保序;低耦合
缺点|存在数据丢失的场景;没有数据备份;数据⼤⼩限制;没有背压机制|没有数据复制;脆弱的容错机制;不⽀持消息保序;可扩展性较差|消息⼤⼩限制
表1 框架⽐较
对于CDC系统,Kafka有⾜够的优势来⽀撑我们做出选择。同时,Kafka Connect的架构完美契合CDC系统:
并⾏ - 对于⼀个数据复制任务,可以通过拆解成多个⼦任务并且并⾏运⾏来提⾼吞吐率。
保序 - Kafka的partition机制可以保证在⼀个partition内数据严格有序,这样有助于我们实现数据完整性。
可扩展 - Kafka Connect在集中分布式的运⾏Connector。
易⽤ - 对Kafka的接⼝进⾏了抽象,提升了易⽤性。
均衡 - Kafka Connect⾃动检测故障,并在剩余进程上根据各⾃负载重新进⾏均衡调度。
⽣命周期管理 – 提供完善的Connector的⽣命周期管理能⼒。
4 MRS CDL关键技术
图2 CDL关键技术
4.1 CDL Job
MRS CDL对业务进⾏了上层的抽象,通过引⼊CDL Job的概念来定义⼀个完整的业务流程。在⼀个Job中,⽤户可以选择数据源和⽬标存储类型,并且可以筛选要复制的数据表。
在Job结构的基础上,MRS CDL提供执⾏CDL Job的机制,在运⾏时,使⽤Kafka Connect Source Connector结合⽇志复制技术将CDC事件从源数据存储捕获到Kafka,然后使⽤Kafka Connect Sink Connector从Kafka提取数据,在应⽤各种转换规则后将最终结果推送到⽬标存储。
提供定义表级和列级映射转换的机制,在定义CDL Job的过程中可以指定转换规则。
4.2 Data Comparison
MRS CDL提供⼀种特殊的Job,⽤于进⾏数据⼀致性对⽐。⽤户可以选择源和⽬标数据存储架构,从源和⽬标架构中选择各种⽐较对进⾏数据⽐较,以确保数据在源和⽬标数据存储中⼀致。
图3 Data Comparison抽象视图
MRS CDL提供了专⽤的Rest API来运⾏Data Compare Job,并且提供如下能⼒:
提供多样的数据⽐较算法,如⾏哈希算法,⾮主键列⽐较等。
提供专门的查询接⼝,可以查询同步报表,展⽰当前Compare任务的执⾏明细。
提供实时的基于源和⽬标存储的修复脚本,⼀键修复不同步数据。
如下是Data Compare Job执⾏流程:
图4 Data Compare Job执⾏和查看流程
4.3 Source Connectors
MRS CDL通过Kafka Connect SDK创建各种源连接器,这些连接器从各种数据源捕获CDC事件并推送到Kafka。CDL提供专门的Rest API 来管理这些数据源连接器的⽣命周期。
4.3.1 Oracle Source Connector
Oracle Source Connector使⽤Oracle RDBMS提供的Log Miner接⼝从Oracle数据库捕获DDL和DML事件。
图5 Log Miner抓取数据⽰意图
在处理DML事件时,如果表中存在BOLB/CLOB列,CDL同样可以提供⽀持。对于BOLB列的处理,关键点处理如下:
当insert/update操作发⽣时,会触发⼀系列的LOB_WRITE操作。
LOB_WRITE⽤于将⽂件加载到BLOB字段中。
每个LOB_WRITE只能写⼊1KB数据。
对于⼀个1GB的图⽚⽂件,我们会整理全部的100万个LOB_WRITE操作中的⼆进制数据,然后合并成⼀个对象。我们会把这个对象存储到Huawei OBS中,最终在写⼊Kafka的message中给出该对象在OBS中的位置。
对于DDL事件的捕获,我们创建单独的会话来持续跟踪。当前⽀持的DDL语句如下:
No DDL语句⽰例
1CREATE TABLE CREATE TABLE TEST ( EMPID INT PRIMARY KEY, ENAME VARCHAR2(10))
2ALTER TABLE ... ADD ( )ALTER TABLE TEST ADD ( SALARY NUMBER)
3ALTER TABLE ... DROP COLUMN ...ALTER TABLE TEST DROP (SALARY)
4ALTER TABLE ... MODIFY ( ...ALTER TABLE TEST MODIFY SALARY INT
5ALTER ... ALTER TABLE TEST RENAME TO CUSTOMER
6DROP ...DROP TABLE TEST
7CREATE UNIQUE INDEX ...CREATE UNIQUE INDEX TESTINDEX ON TEST (EMPID, ENAME)
8DELETE INDEX …Delete existing index
No DDL语句⽰例
表2 ⽀持的DDL语句
4.3.2 MYSQL Source Connector
MYSQL的Binary Log(Bin Log)⽂件顺序记录了所有提交到数据库的操作,包括了对表结构的变更和对表数据的变更。MYSQL Source Connector通过读取Bin Log⽂件,⽣产CDC事件并提交到Kafka的Topic中。
MYSQL Source Connector主要⽀持的功能场景有:
捕获DML事件,并且⽀持并⾏处理所捕获的DML事件,提升整体性能
⽀持表过滤
⽀持配置表和Topic的映射关系
为了保证CDC事件的绝对顺序,我们⼀般要求⼀张表只对应⼀个Partition,但是,MYSQL Source Connector仍然提供了写⼊多Partition的能⼒,来满⾜某些需要牺牲消息保序性来提升性能的场景
提供基于指定Bin Log⽂件、指定位置或GTID来重启任务的能⼒,保证异常场景下数据不丢失
⽀持多种复杂数据类型
⽀持捕获DDL事件
4.3.3 PostgreSQL Source Connector
PostgreSQL的逻辑解码特性允许我们解析提交到事务⽇志的变更事件,这需要通过输出插件来处理这些变更。PostgreSQL Source Connector使⽤pgoutput插件来完成这项⼯作。pgoutput插件是PostgreSQL 10+提供的标准逻辑解码插件,⽆需安装额外的依赖包。PostgreSQL Source Connector和MYSQL Source Connector除了部分数据类型的区别外其他功能基本⼀致。
4.4 Sink Connectors
MRS提供多种Sink Connector,可以从Kafka中拉取数据并推送到不同的⽬标存储中。现在⽀持的Sink Connector有:HDFS Sink Connector
OBS Sink Connector
Hudi Sink Connector
ClickHouse Sink Connector
Hive Sink Connector
其中Hudi Sink Connector和ClickHouse Sink Connector也⽀持通过Flink/Spark应⽤来调度运⾏。
4.5 表过滤
当我们想在⼀个CDL Job中同时捕获多张表的变更时,我们可以使⽤通配符(正则表达式)来代替表名,即允许同时捕获名称满⾜规则的表的CDC事件。当通配符(正则表达式)不能严格匹配⽬标时,就会出现多余的表被捕获。为此,CDL提供表过滤功能,来辅助通配符模糊匹配的场景。当前CDL同时⽀持⽩名单和⿊名单两种过滤⽅式。
4.6 统⼀数据格式
MRS CDL对于不同的数据源类型如Oracle、MYSQL、PostgreSQL采⽤了统⼀的消息格式存储在Kafka中,后端消费者只需解析⼀种数据格式来进⾏后续的数据处理和传输,避免了数据格式多样导致后端开发成本增加的问题。
4.7 任务级的⽇志浏览
通常境况下,⼀个CDL Connector会运⾏多个Task线程来进⾏CDC事件的抓取,当其中⼀个Task失败时,很难从海量的⽇志中抽取出强相关的⽇志信息,来进⾏进⼀步的分析。
为了解决如上问题,CDL规范了CDL Connector的⽇志打印,并且提供了专⽤的REST API,⽤户可以通过该API⼀键获取指定Connector或者Task的⽇志⽂件。甚⾄可以指定起⽌时间来进⼀步缩⼩⽇志查询的范围。
4.8 监控
MRS CDL提供REST API来查询CDL服务所有核⼼部件的Metric信息,包括服务级、⾓⾊级、实例级以及任务级。
4.9 应⽤程序错误处理
在业务运⾏过程中,常常会出现某些消息⽆法发送到⽬标数据源的情况,我们把这种消息叫做错误记录。在CDL中,出现错误记录的场景有很多种,⽐如:
Topic中的消息体与特定的序列化⽅式不匹配,导致⽆法正常读取
⽬标存储中并不存在消息中所存储的表名称,导致消息⽆法发送到⽬标端
为了处理这种问题,CDL定义了⼀种“dead letter queue”,专门⽤于存储运⾏过程中出现的错误记录。本质上“dead letter queue”是由Sink Connector创建的特定的Topic,当出现错误记录时,由Sink Connector将其发往“dead letter queue”进⾏存储。
同时,CDL提供了REST API来供⽤户随时查询这些错误记录进⾏进⼀步分析,并且提供Rest API可以允许⽤户对这些错误记录进⾏编辑和重发。
图6 CDL Application Error Handling
5 性能
CDL使⽤了多种性能优化⽅案来提⾼吞吐量:
Task并发
我们利⽤Kafka Connect提供的任务并⾏化功能,其中Connect可以将作业拆分为多个任务来并⾏复制数据,如下所⽰:
图7 Task并发
使⽤Executor线程并⾏化执⾏任务
由于Log Miner,Bin Log等数据复制技术的限制,我们的Source Connector只能顺序的捕获CDC事件,因此,为了提⾼性能,我们将这些CDC事件先缓存到内存队列中,然后使⽤Executor线程并⾏的处理它们。这些线程会先从内部队列中读取数据,然后处理并且推送到Kafka中。
图8 Executor线程并发
6 总结
MRS CDL是数据实时⼊湖场景下重要的⼀块拼图,我们仍然需要在数据⼀致性、易⽤性、多组件对接以及性能提升等场景需要进⼀步扩展和完善,在未来能够更好的为客户创造价值。
本⽂由发布。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。