多实例canal应⽤-1个server+2个instance+2个client+2个mysql canal应⽤-1个server+2个instance+2个client+2个mysql 原创
⼀ canal应⽤架构设计
组件说明:
1 . linux内核版本(CentOS Linux 7):(命令:uname -a)
Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
Server version: 5.6.43-log MySQL Community Server (GPL)
3.canal版本:canal-1.1.3
4.JDK版本: 1.8
canal⼯作原理:
1.模拟mysql slave的交互协议,伪装⾃⼰为mysql slave,向mysql master发送dump协议;
3.解析binary log对象(原始为byte流)
了解更多详细更新可以查看⽂章:
⼆架构落地实现流程
2.1 mysql配置与安装
1. 下载安装
在192.168.175.21和192.168.175.22上分别安装mysql,具体安装流程可参考⽂章:.
2. 创建canal账户
在创建root账号并设置远程访问之后,接着创建canal账号并设置远程访问和权限:
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
mysql> GRANT ALL ON canal.* TO 'canal'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'canal'@'%';
mysql>FLUSH PRIVILEGES;
3. 验证登录
#远程登录
mysql -h 192.168.175.22 -P 3306 -u canal -pcanal
#本地登录
mysql -ucanal -pcanal
4. 修改myf配置
分别在175.21和175.22两台服务器修改my.conf配置,查myf配置位置命令:whereis my.
192.168.175.21中的myf配置新增如下内容:
log_bin=mysql-bin  #指定bin-log的名称,尽量可以标识业务含义
binlog_format=row  #选择row模式,必须
server_id=1  #mysql服务器id
2.2 canal server配置与启动
1. 下载canal
2.上传并解压
进⼊192.168.175.20服务器,使⽤rz命令上传,使⽤如下命令进⾏解压⾄/usr/local/hadoop/app/canal:
tar xzvf canal.deployer-1.1. -C canal
3. 修改配置
新解压的⽂件夹/usr/local/hadoop/app/canal/conf/有⼀个example⽂件夹,⼀个example就代表⼀个instance实例.⽽⼀个instance实例就是⼀个消息队列,所以这⾥可以将⽂件名改为example1,同时再复制出来⼀个叫example2.(命名可以使⽤监听的数据库名)
修改/usr/local/hadoop/app/canal/conf/example1/instance.properties配置⽂件:
canal.instance.master.address=192.168.175.21:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
tionCharset = UTF-8
pic=example1
修改/usr/local/hadoop/app/canal/conf/example2/instance.properties配置⽂件:
canal.instance.master.address=192.168.175.22:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
tionCharset = UTF-8
pic=example2
4. 启动canal server
进⼊⽂件夹/usr/local/hadoop/app/canal/bin执⾏如下命令:
mysql下载之后是个文件夹./startup.sh
查看⽇志/usr/local/hadoop/app/canal/logs/canal/canal.log,出现如下内容,即表⽰启动成功:
2019-06-07 21:15:03.372 [main] INFO  canal.deployer.CanalLauncher - ## load canal configurations
2019-06-07 21:15:03.427 [main] INFO  RemoteConfigLoaderFactory - ## load local canal configurations
2019-06-07 21:15:03.529 [main] INFO  canal.deployer.CanalStater - ## start the canal server.
2019-06-07 21:15:06.251 [main] INFO  canal.deployer.CanalController - ## start th
e canal server[192.168.175.22:11111] 2019-06-07 21:15:22.245 [main] INFO  canal.deployer.CanalStater - ## the canal server is running now ......
5. 启动canal client
注意运⾏canal客户端代码时,⼀定要先启动canal server
(1) 添加pom依赖
<!--canal-->
<dependency>
<groupId></groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
(2) canal client代码:
h.canal;
import java.InetSocketAddress;
import java.util.List;
import canal.client.CanalConnector;
import canal.client.CanalConnectors;
import canal.protocol.CanalEntry.Column;
import canal.protocol.CanalEntry.Entry;
import canal.protocol.CanalEntry.EntryType;
import canal.protocol.CanalEntry.EventType;
import canal.protocol.CanalEntry.RowChange;
import canal.protocol.CanalEntry.RowData;
import canal.protocol.Message;
public class CanalClientTest {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = wSingleConnector(new InetSocketAddress("192.168.175.20", 11111),
"example1", "", "");//或者example2
int batchSize = 1000;
int emptyCount = 0;
try {
connector.subscribe(".*\\..*");//订阅所有库下⾯的所有表
//connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {//实际⽣产中需要设置为true,死循环
Message message = WithoutAck(batchSize); // 获取指定数量的数据
long batchId = Id();
int size = Entries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
e.printStackTrace();
}
} else {
emptyCount = 0;
System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
Entries());
}
connector.ack(batchId); // 提交确认
/
/ llback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (EntryType() == EntryType.TRANSACTIONBEGIN
|| EntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.StoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + String(),                        e);
}
System.out.println("rowChare ======>"+String());
EventType eventType = EventType(); //事件類型,⽐如insert,update,delete
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
eventType));//事件名
for (RowData rowData : RowDatasList()) {
if (eventType == EventType.DELETE) {
BeforeColumnsList());
} else if (eventType == EventType.INSERT) {
AfterColumnsList());
} else {
System.out.println("-------> before");
BeforeColumnsList());
System.out.println("-------> after");
AfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.Name() + " : " + Value() + "    update=" + Updated());
}
}
}
canal client运⾏实例:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
6. 触发数据库变更
创建库:create database canal;
创建表:create table t_canal (id int,name varchar(20),status int);
插⼊数据:insert into t_canal values(10,'hello',1);
canal client输出⽇志:
================> binlog[mysql-bin.000001:6764] , name[canal,t_canal] , eventType : INSERT
id : 10    update=true
name : hello    update=true
status : 1    update=true
三. ⾃问⾃答-为何设置了数据表的过滤条件,但貌似没有⽣效?
答:⾸先看⽂档AdminGuide,了解canal.的书写格式。mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例⼦:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的⼀张表:st1
5. 多个规则组合使⽤:canal\\..*,st2 (逗号分隔)
检查binlog格式,过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以⽆法准确提取tableName进⾏过滤)。
检查下CanalConnector是否调⽤subscribe(filter)⽅法;有的话,filter需要和instance.properties的canal.⼀致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是.…,那么相当于你消费了所有的更新数据 【特别注意】
参考⽂章:
1.
2.

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。