基于Canal、MySQL8、ES6实现数据ETL和⽗⼦⽂档关联搜索
⼀、概述前阵⼦接到⼀个新任务,要建设⼀个商家商品搜索系统,能够为⽤户提供快速、准确的搜索能⼒,在⽤户输⼊搜索内容时,要能从商家名称和商品名称两个维度去搜索,搜索出来的结果,按照准确率排序,并按商家所属商品的关联关系,来组合数据结构,同时提供API 给业务系统调⽤。
本⽂主要介绍如何使⽤Canal、MySQL8、ElasticSearch6等技术,实现数据从不同系统的数据库实时同步到ES中,并且如何使⽤Canal 来实现多表关联、⽗⼦⽂档的配置,以达到数据结构要求。
⼆、系统架构设计思路为了设计出合适的系统架构,我们分析了现状。
⾸先,商家数据和商品数据分别存储在2个独⽴的MySQL8数据库,为满⾜商家数据和商品数据的关联,我们需要将两个库中所需要的表实时ETL到我们的搜索系统数据库。
其次,数据从商家、商品数据库ETL到搜索系统数据库后,需要实时的组合成为商家关联商品数据结构,并以⽗⼦⽂档的格式,存储到ES 中。
最后,商家、商品数据库的增删改操作,需要实时的同步到ES中,也就是ES中的数据,需要⽀持实时的增加、删除和修改。
为此,我们设计了2个canal组件,第⼀个canal实现数据ETL,把商家、商品数据库的某些表及字段,抽取到搜索服务数据库;再利⽤第⼆个canal,读取搜索服务MySQL数据库的binlog,实时传输到kafka消息队列,再由canal adapter对数据进⾏关联、⽗⼦⽂档映射等,将处理好的数据存储到ElasticSearch中。
三、项⽬实战1、环境及软件说明操作系统:CentOS 7
canal:canal.adapter-1.1.4,canal.deployer-1.1.4
kafka:kafka_2.12-2.3.0
ElasticSearch:elasticsearch-6.3.2
kibana:kibana-6.3.2
2、利⽤Canal实现数据ETL到MySQL8这个步骤是利⽤canal从2个独⽴的MySQL8数据库中,抽取需要的表到搜索服务的MySQL数据库。
2.1 安装canaldeployer(1)解压canal.deployer-1.1.
(2)配置canal deployer
进⼊canaldeployer/conf⽬录,修改canal.properties⽂件,主要配置serverMode、MQ和destination三部分。
⾸先,我们serverMode修改为kafka模式,增加系统缓冲能⼒以及提⾼系统稳定性:serverMode接着,配置kafka的MQ信息(kafka
请⾃⾏安装):kafka MQ信息最后,配置需要实例化的instance,这⾥配置了3个,表⽰canal deploy会启动这3个实例,同步
MySQL的binlog到kafka的topic内。如下图所⽰:destinations实例配置(3)配置canal deployer instance
进⼊canaldeployer/conf/example⽬录,发现有⼀个instance.properties⽂件,这是canal给的⽰例,我们可以参考其配置。
①我们拷贝整个example⽬录,并重命名为上个步骤配置的destination之⼀,如xxxsearch;
②进⼊xxxsearch⽬录,编辑instance.properties⽂件,主要配置源数据库信息、所需数据表及字段,以及指定kafka的topic名,这样源
数据库的binlog就会转换为json数据,并实时的通过canal deployer传输到kafka该topic中。如下所⽰:canaldeploy instance 源数
据库配置canaldeploy instance kafka topic配置③进⼊canaldeployer/bin⽬录,执⾏./startup.sh,启动canal deployer及所属实例。
⾄此canal deployer搭建完成。
2.2 安装canal.adapter我们需要利⽤canal.adapter将kafka topic中的binlog json数据,经过清洗转换等操作,存储到MySQL8中。由于canal原⽣是不⽀持MySQL8的,故我们需要做⼀些调整。
(1)增加MySQL8连接驱动
解压canal.adapter-1.1.,进⼊canaladapter/lib⽬录,移除mysql-connector-java-5.1.40.jar,导⼊mysql-connector-java-8.0.18.jar
(2)配置canal adapter,使数据输出到MySQL8。
进⼊canaladapter/conf⽬录,编辑l⽂件,主要配置消费kafka、源数据库信息和搜索系统数据库信息,如下所⽰:
ETL到MySQL8配置接着,进⼊canaladapter/conf/rdb⽬录,以官⽅提供的l为例,配置kafka topic名、源数据库名、
源数据表名,以及⽬标数据库名和⽬标数据表名,建议⼀张表对应⼀个yml⽂件。ETL表结构映射配置(3)启动canaladapter
进⼊canaladapter/bin⽬录,执⾏./startup.sh,启动canal adapter,观察logs/adapter/adapter.log⽇志⽂件,⼿动在搜索系统数据库
新增⼀条记录,看是否会打印如下⽇志,即有2条记录,⼀条INFO,⼀条DEBUG,则表⽰配置成功。canaladapter⽇志⾄此,数据ETL阶段搭建完成,数据可从两个不同的MySQL8数据库,实时同步到搜索服务的MySQL数据库。
3、实现数据多表关联、⽗⼦⽂档映射(1)配置第⼆个canal的canaladapter
mysql文档手机版
进⼊canaladapter/conf⽬录,编辑l⽂件,主要配置消费kafka、搜索系统数据库,和ES连接信息,如下所⽰:canaladapter MQ及mysql配置
canaladapter ES配置(2)配置多表关联
进⼊canaladapter/conf/es⽬录,vim l,编辑多表关联配置:多表关联配置注意,sql⽀持多表关联⾃由组合, 但是有⼀定的限制:
(a)主表不能为⼦查询语句
(b)只能使⽤left outer join即最左表⼀定要是主表
(c)关联从表如果是⼦查询不能有多张表
(d)主sql中不能有where查询条件(从表⼦查询中可以有where条件但是不推荐, 可能会造成数据同步的不⼀致, ⽐如修改了where条件中的字段内容)
(e)关联条件只允许主外键的'='操作不能出现其他常量判断⽐如: le_id=b.id and b.statues=1
(f)关联条件必须要有⼀个字段出现在主查询语句中⽐如: le id=b.id 其中的 a.role id 或者 b.id 必须出现在主select语句中
(g)Elastic Search的mapping 属性与sql的查询值将⼀⼀对应(不⽀持 select *), ⽐如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将映射到mapping的_email field, 这⾥以别名(如果有别名)作为最终的映射字段. 这⾥的_id可以填写到配置⽂件的 _id: _id映射.
(3)配置⽗⼦⽂档
以官⽅的l为例,vim l,配置⽗⼦⽂档映射:配置⽗⼦⽂档映射(4)在ElasticSearch6中,建⽴index和⽗⼦⽂档映射关系
进⼊kibana页⾯,点击Dev Tools,执⾏如下命令,即可建⽴索引及⽗⼦⽂档映射:建⽴index和⽗⼦⽂档映射其中,ES6和kibana的安装,在此⽆特别配置,不做赘述。
(5)启动canal adapter
进⼊canaladapter/bin⽬录,执⾏./startup.sh,启动canal adapter,观察logs/adapter/adapter.log⽇志⽂件,⼿动在搜索系统数据库
新增⼀条记录,看是否会打印如下⽇志,如打印则表⽰配置成功。正确配置adapter⽇志⽰例4、运⾏结果现在,我们可以通过kibana来执⾏DSL语句来查询看看。
我们事先已在商家系统中增加了⼀个“肯德基”商店,然后在商品系统中添加了“西红柿”和”新鲜西红柿“2个商品,并将商品关联
到“肯德基”上。接着我们查询”肯德基“或者“西红柿”,得到以下是查询的结果(去除了ES默认字段):通过DSL查询的结果由图可见,我们可以通过商家名查询商品,也可通过商品名查询商店和商品,并且canal⽀持数据的实时增删改,所以ES的数据也会与商家系统和商品系统保持⼀致,同时数据结构包含商家及对应的商品,满⾜业务需求。
5、总结⾄此,基于Canal、kafka、MySQL8、ElasticSearch6技术的商家商品搜索系统基础框架搭建完成。我们采⽤canal deployer实时读取商家、商品系统的MySQL数据库binlog,并发送⾄kafka,接着由canal adapter消费kafka,并将binlog json数据进⾏多表关联、⽗⼦⽂档映射,最后存储到ES6中,供上层搜索服务调⽤。
本⽂由博客发⼀⽂多发等运营⼯具平台 发布

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。