腾讯⼤⽜教你ClickHouse实时同步MySQL数据作者史鹏宙 CSIG云与智慧产业事业研发⼯程师
ClickHouse作为OLAP分析引擎已经被⼴泛使⽤,数据的导⼊导出是⽤户⾯临的第⼀个问题。由于ClickHouse本⾝⽆法很好地⽀持单条⼤批量的写⼊,因此在实时同步数据⽅⾯需要借助其他服务协助。本⽂给出⼀种结合Canal+Kafka的⽅案,并且给出在多个MySQL实例分库分表的场景下,如何将多张MySQL数据表写⼊同⼀张ClickHouse表的⽅法,欢迎⼤家批评指正。
⾸先来看看我们的需求背景:
1. 实时同步多个MySQL实例数据到ClickHouse,每天规模500G,记录数⽬亿级别,可以接受分钟级别的同步延迟;
2. 某些数据库表存在分库分表的操作,⽤户需要跨MySQL实例跨数据库的表同步到ClickHouse的⼀张表中;
3. 现有的MySQL binlog开源组件(Canal),⽆法做到多张源数据表到⼀张⽬的表的映射关系。
基本原理
⼀、使⽤JDBC⽅式同步
1. 使⽤Canal组件完成binlog的解析和数据同步;
2. Canal-Server进程会伪装成MySQL的slave,使⽤MySQL的binlog同步协议完成数据同步;
3. Canal-Adapter进程负责从canal-server获取解析后的binlog,并且通过jdbc接⼝写⼊到ClickHouse;
优点:
1. Canal组件原⽣⽀持;
缺点:
1. Canal-Adpater写⼊时源表和⽬的表⼀⼀对应,灵活性不⾜;
2. 需要维护两个Canal组件进程;
⼆、Kafka+ClickHouse物化视图⽅式同步
1. Canal-Server完成binlog的解析,并且将解析后的json写⼊Kafka;
2. Canal-Server可以根据正则表达式过滤数据库和表名,并且根据规则写⼊Kafka的topic;
3. ClickHouse使⽤KafkaEngine和Materialized View完成消息消费,并写⼊本地表;
优点:
1. Kafka⽀持⽔平扩展,可以根据数据规模调整partition数⽬;
2. Kafka引⼊后将写⼊请求合并,防⽌ClickHouse⽣成⼤量的⼩⽂件,从⽽影响查询性能;
3. Canal-Server⽀持规则过滤,可以灵活配置上游的MySQL实例的数据库名和表名,并且指明写⼊的Kafka topic名称;
缺点:
1. 需要维护Kafka和配置规则;
2. ClickHouse需要新建相关的视图、Kafka Engine的外表等;
具体步骤
⼀、准备⼯作
1. 如果使⽤TencentDB,则在控制台确认binlog_format为ROW,⽆需多余操作。
如果是⾃建MySQL,则在客户端中查询变量:
>  show variables like '%binlog%';
+-----------------------------------------+----------------------+
| Variable_name                          | Value                |
+-----------------------------------------+----------------------+
| binlog_format                          | ROW                  |
+-----------------------------------------+----------------------+
> show variables like '%log_bin%';
+---------------------------------+--------------------------------------------+
| Variable_name                  | Value                                      |
+---------------------------------+--------------------------------------------+
| log_bin                        | ON                                        |
| log_bin_basename                |  /data/mysql_root/log/20146/mysql-bin        |
| log_bin_index                  |  /data/mysql_root/log/20146/mysql-bin.index |
+---------------------------------+--------------------------------------------+
2. 创建账号canal,⽤于同步binlog
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%'; FLUSH PRIVILEGES;
⼆、Canal组件部署
前置条件:
Canal组件部署的机器需要跟ClickHouse服务和MySQL⽹络互通;
需要在机器上部署java8,配置JAVA_HOME、PATH等环境变量;
基本概念:
1. Canal-Server组件部署
Canal-Server的主要作⽤是订阅binlog信息并解析和定义instance相关信息,建议每个Canal-Server进程对应⼀个MySQL实例;1)下载canal.deployer-1.1.,解压
2)修改配置⽂件conf/canal.properties,需要关注的配置如下:
...
# 端⼝相关信息,如果同⼀台机器部署多个进程需要修改
canal.port = 11111
canal.admin.port = 11110
...
# 服务模式
canal.serverMode = tcp
...
# Kafka地址
canal.mq.servers = 172.21.48.11:9092
# 使⽤消息队列时这两个值必须为true
canal.mq.flatMessage = true
canal.lyData = true
...
# instance列表,conf⽬录下必须有同名的⽬录
canal.destinations = example,example2
3)配置instance
可以参照example新增新的instance,主要修改配置⽂件conf/${instance_name}/instance.properties⽂件。
样例1: 同步某个数据库的以XX前缀开头的表
订阅 172.21.48.35的MySQL的testdb数据库中的以tb_开头的表的数据变更(例如tb_20200801 、 tb_20200802等),主要的步骤如下:
步骤1:创建example2实例:cddeployer/conf && cp -r example example2
步骤2:修改deployer/conf/example2/instance.properties⽂件
...
# 上游MySQL实例地址
canal.instance.master.address=172.21.48.35:3306
...
# 同步账户信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# 过滤数据库名称和表名
canal.=testdb\\.tb_.*,
步骤3:在conf/canal.properties中修改 canal.destinations ,新增example2
样例2: 同步多个数据库的以XX前缀开头的表,且输出到Kafka
订阅 172.21.48.35的MySQL的empdb_0数据库的employees_20200801表,empdb_1数据库的employees_20200802表,并且数据写⼊Kafka;步骤1:创建example2实例:cddeployer/conf && cp -r example example3
步骤2:修改deployer/conf/example3/instance.properties⽂件
...
# 上游MySQL实例地址
canal.instance.master.address=172.21.48.35:3306
...
# 同步账户信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# 过滤数据库名称和表名
canal.=empdb_.*\\.employees_.*
...
# Kafka的topic名称和匹配的规则
canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*
canal.mq.partition=0
# Kafka topic的分区数⽬(即partition数⽬)
canal.mq.partitionsNum=3
# 根据employees_开头的表中的 emp_no字段来进⾏数据hash,分布到不同的partition
canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no
步骤3:在Kafka中新建topic employees_topic,指定分区数⽬为3
步骤4:在conf/canal.properties中修改 canal.destinations ,新增example3;修改服务模式为kafka,配置kafka相关信息;
# 服务模式
canal.serverMode = kafka
...
# Kafka地址
canal.mq.servers = 172.21.48.11:9092
# 使⽤消息队列时这两个值必须为true
canal.mq.flatMessage = true
canal.lyData = true
...
mysql下载配置
# instance列表,conf⽬录下必须有同名的⽬录
canal.destinations =  example,example2,example3
2. Canal-Adapter组件部署(只针对⽅案⼀)
Canal-Adapter的主要作⽤是通过JDBC接⼝写⼊ClickHouse数据,可以配置多个表的写⼊;
1)下载canal.adapter-1.1.,解压;
2)在lib⽬录下新增clickhouse驱动jar包及httpclient的jar包 httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;3)修改配置⽂件l⽂件,修改canalServerHost、srcDataSources、canalAdapters的配置;
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH  ss
time-zone: GMT+8
default-property-inclusion: non_null
mode: tcp
canalServerHost: 127.0.0.1:11111  # canal-server的服务地址
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
#  MySQL的配置,修改⽤户名密码及制定数据库
srcDataSources:
defaultDS:
url: jdbc:mysql://172.21.48.35:3306
username: root
password: yourpasswordhere
canalAdapters:
-  instance: example
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
# clickhouse的配置,修改⽤户名密码数据库
properties:
jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。