⼲货Debezium实现Mysql到Elasticsearch⾼效实时同步
Elasticsearch 最少必要知识实战教程直播回放
题记
来⾃Elasticsearch中⽂社区的问题——
MySQL中表⽆唯⼀递增字段,也⽆唯⼀递增时间字段,该怎么使⽤logstash实现MySQL实时增量导数据到es中?
logstash和kafka_connector都仅⽀持基于⾃增id或者时间戳更新的⽅式增量同步数据。
回到问题本⾝:如果库表⾥没有相关字段,该如何处理呢?
本⽂给出相关探讨和解决⽅案。
1、 binlog认知
1.1 啥是 binlog?
binlog是Mysql sever层维护的⼀种⼆进制⽇志,与innodb引擎中的redo/undo log是完全不同的⽇志;
其主要是⽤来记录对mysql数据更新或潜在发⽣更新的SQL语句,并以"事务"的形式保存在磁盘中;
作⽤主要有:
1)复制:达到master-slave数据⼀致的⽬的。
2)数据恢复:通过mysqlbinlog⼯具恢复数据。
3)增量备份。
1.2 阿⾥的Canal实现了增量Mysql同步
⼀图胜千⾔,canal是⽤java开发的基于数据库增量⽇志解析、提供增量数据订阅&消费的中间件。
⽬前,canal主要⽀持了MySQL的binlog解析,解析完成后才利⽤canal client ⽤来处理获得的相关数据。⽬的:增量数据订阅&消费。综上,使⽤binlog可以突破logstash或者kafka-connector没有⾃增id或者没有时间戳字段的限制,实现增量同步。
2、基于binlog的同步⽅式
由于已经部署过conluent(kafka的企业版本,⾃带zookeeper、kafka、ksql、kafka-connector等),
本⽂仅针对Debezium展开。
3、Debezium介绍
Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度⾮常快。
特点:
1)简单。⽆需修改应⽤程序。可对外提供服务。
2)稳定。持续跟踪每⼀⾏的每⼀处变动。
3)快速。构建于kafka之上,可扩展,经官⽅验证可处理⼤容量的数据。
4、同步架构
如图,Mysql到ES的同步策略,采取“曲线救国”机制。
步骤1: 基Debezium的binlog机制,将Mysql数据同步到Kafka。
步骤2: 基于Kafka_connector机制,将kafka数据同步到Elasticsearch。
5、Debezium实现Mysql到ES增删改实时同步
软件版本:
confluent:5.1.2;
Debezium:0.9.2_Final;
Mysql:
Elasticsearch:6.6.1
5.1 Debezium安装
Debezium的安装只需要把debezium-connector-mysql的压缩包解压放到Confluent的解压后的插件⽬录(share/java)中。注意重启⼀下confluent,以使得Debezium⽣效。
5.2 Mysql binlog等相关配置。
Debezium使⽤MySQL的binlog机制实现数据动态变化监测,所以需要Mysql提前配置binlog。
核⼼配置如下,在Mysql机器的/etc/myf的mysqld下添加如下配置。
[mysqld]
server-id = 223344
log_bin = mysql-bin
mysql下载jar包binlog_format = row
binlog_row_image = full
expire_logs_days = 10
然后,重启⼀下Mysql以使得binlog⽣效。
systemctl start mysqld.service
5.3 配置connector连接器。
配置confluent路径⽬录 : /etc
创建⽂件夹命令 :
mkdir kafka-connect-debezium
在mysql2kafka_debezium.json存放connector的配置信息 :
[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json
{
"name" : "debezium-mysql-source-0223",
"config":
{
"connector.class" : "sql.MySqlConnector",
"database.hostname" : "192.168.1.22",
"database.port" : "3306",
"database.user" : "root",
"database.password" : "XXXXXX",
"database.whitelist" : "kafka_base_db",
"table.whitlelist" : "accounts",
"database.server.id" : "223344",
"database.server.name" : "full",
"database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",
"database.pic" : "account_topic",
"include.schema.changes" : "true" ,
"lumn.name" : "id",
"database.history.skip.unparseable.ddl" : "true",
"transforms": "unwrap,changetopic",
"pe": "ansforms.UnwrapFromEnvelope",
"pe":"org.t.transforms.RegexRouter",
"":"(.*)",
"placement":"$1-smt"
}
}
注意如下配置:
1. “database.server.id”,对应Mysql中的server-id的配置。
2. “database.whitelist” : 待同步的Mysql数据库名。
3. “table.whitlelist” :待同步的Mysq表名。
4. 重要:“database.pic”:存储数据库的Shcema的记录信息,⽽⾮写⼊数据的topic、
5. “database.server.name”:逻辑名称,每个connector确保唯⼀,作为写⼊数据的kafka topic的前缀名称。
坑⼀:transforms相关5⾏配置作⽤是写⼊数据格式转换。
如果没有,输⼊数据会包含:before、after记录修改前对⽐信息以及元数据信息(source,op,ts_ms等)。
这些信息在后续数据写⼊Elasticsearch是不需要的。(注意结合⾃⼰业务场景)。
5.4 启动connector
curl -X POST -H "Content-Type:application/json"
--data @mysql2kafka_debezium.json.json
192.168.1.22:18083/connectors | jq
5.5 验证写⼊是否成功。
查看kafka-topic
kafka-topics --list --zookeeper localhost:2181
此处会看到写⼊数据topic的信息。
注意新写⼊数据topic的格式:database.schema.table-smt 三部分组成。
本⽰例topic名称:full.kafka_base_db.account-smt。
消费数据验证写⼊是否正常
./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning
⾄此,Debezium实现mysql同步kafka完成。
6、kafka-connector实现kafka同步Elasticsearch
6.1、Kafka-connector介绍
Kafka Connect是⼀个⽤于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和⽂件系统)的框架。
连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写⼊Kafka,或者Kafka数据写⼊⽬标数据库,也可以⾃⼰开发连接器。
6.2、kafka到ES connector同步配置
配置路径:
/
home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
配置内容:
"connector.class": "t.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "full.kafka_base_db.account-smt",
"key.ignore": "true",
"connection.url": "192.168.1.22:9200",
"type.name": "_doc",
"name": "elasticsearch-sink-test"
6.3 kafka到ES启动connector
启动命令
confluent load elasticsearch-sink-test
-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
6.4 Kafka-connctor RESTFul API查看
Mysql2kafka,kafka2ES的connector详情信息可以借助postman或者浏览器或者命令⾏查看。
curl -X GET localhost:8083/connectors
7、坑复盘。
坑2: 同步的过程中可能出现错误,⽐如:kafka topic没法消费到数据。
排解思路如下:
1)确认消费的topic是否是写⼊数据的topic;
2)确认同步的过程中没有出错。可以借助connector如下命令查看。
curl -X GET localhost:8083/connectors-xxx/status
坑3: Mysql2ES出现⽇期格式不能识别。
是Mysql jar包的问题,解决⽅案:在myf中配置时区信息即可。
坑4: kafka2ES,ES没有写⼊数据。
排解思路:
1)建议:先创建同topic名称⼀致的索引,注意:Mapping静态⾃定义,不要动态识别⽣成。
2)通过connetor/status排查出错原因,⼀步步分析。
8、⼩结
binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。
对⽐:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。推荐⼤家使⽤。⼤家有好的同步⽅式也欢迎留⾔讨论交流。
铭毅天下——Elasticsearch基础、进阶、实战第⼀
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论