利⽤Canal完成Mysql数据同步Redis
流程
Canal的原理是模拟Slave向Master发送请求,Canal解析binlog,但不将解析结果持久化,⽽是保存在内存中,每次有客户端读取⼀次消息,就删除该消息。这⾥所说的客户端,就需要我们写⼀个连接Canal的程序,持续从Canal获取数据。
程序写MySQL, 解析binlog,数据放⼊队列写Redis
读Redis
[html]
1. [mysqld]
2. log-bin=mysql-bin #添加这⼀⾏就ok
3. binlog-format=ROW #选择row模式
4. server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
2,在mysql中 配置canal数据库管理⽤户,配置相应权限(repication权限)
[html]
1. CREATE USER canal IDENTIFIED BY 'canal';
2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
3. -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
4. FLUSH PRIVILEGES;
[html]
1. mkdir /tmp/canal
2. tar zxvf canal.deployer-$  -C /tmp/canal
3,修改配置⽂件
[html]
1. vi conf/example/instance.properties
1. >>>>>>>>>####
2. ## mysql serverId
3. sql.slaveId = 1234
4.
5. # position info,需要改成⾃⼰的数据库信息
6. canal.instance.master.address = 12
7.0.0.1:3306
7. canal.instance.master.journal.name =
8. canal.instance.master.position =
9. canal.instance.master.timestamp =
10.
11. #canal.instance.standby.address =
12. #canal.instance.standby.journal.name =
13. #canal.instance.standby.position =
14. #canal.instance.standby.timestamp =
15.
16. # username/password,需要改成⾃⼰的数据库信息
17. canal.instance.dbUsername = canal
18. canal.instance.dbPassword = canal
19. canal.instance.defaultDatabaseName =
20. tionCharset = UTF-8
21.
22. # table regex
23. canal. = .*\\..*
24.
25. >>>>>>>>>####
【canal启动和关闭】
1,启动
[html]
1. sh bin/startup.sh
2,查看⽇志
[html]
1. vi logs/canal/canal.log
具体instance的⽇志:
1. vi logs/example/example.log
3,关闭
[html]
1. sh bin/stop.sh
注意:
1,这⾥只需要配置好参数后,就可以直接运⾏
2,Canal没有解析后的⽂件,不会持久化
其中⼀个是连接canal并操作的类,⼀个是redis的⼯具类,使⽤maven主要是依赖包的下载很⽅便。
2,ClientSample代码
这⾥主要做两个⼯作,⼀个是循环从Canal上取数据,⼀个是将数据更新⾄Redis  1. <project xmlns="/POM/4.0.0" xmlns :xsi="/2001/XMLSchema-instance" xsi :schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd">
2.  <modelVersion>4.0.0</modelVersion>
3.  <groupId></groupId>
4.  <artifactId>canal.sample</artifactId>
5.  <version>0.0.1-SNAPSHOT</version>
6.  <dependencies>
7.    <dependency>
8.        <groupId></groupId>
9.        <artifactId>canal.client</artifactId>
10.        <version>1.0.12</version>
11.    </dependency>
12.
13.    <dependency>
14.        <groupId>org.springframework</groupId>
15.        <artifactId>spring-test</artifactId>    16.        <version>3.1.2.RELEASE</version>
17.        <scope>test</scope>
18.    </dependency>
19.
20.    <dependency>
21.        <groupId>redis.clients</groupId>
22.        <artifactId>jedis</artifactId>
23.        <version>2.4.2</version>
24.    </dependency>
25.
26.    </dependencies>
27.  <build/>
28.
</project>  [cpp]
1.
package canal.sample;  2.
3.
import java.InetSocketAddress;    4.
import java.util.List;    5.
6.
import com.alibaba.fastjson.JSONObject;  7.
import canal.client.CanalConnector;    8.
import canalmon.utils.AddressUtils;    9.
import canal.protocol.Message;    10.
import canal.protocol.CanalEntry.Column;    11.
import canal.protocol.CanalEntry.Entry;    12.
import canal.protocol.CanalEntry.EntryType;    13.
import canal.protocol.CanalEntry.EventType;    14. import canal.protocol.CanalEntry.RowChange;
14. import canal.protocol.CanalEntry.RowChange;
15. import canal.protocol.CanalEntry.RowData;
16. import canal.client.*;
17.
18. public class ClientSample {
19.
20. public static void main(String args[]) {
21.
22. // 创建链接
23.        CanalConnector connector = wSingleConnector(new HostIp(),
24.                11111), "example", "", "");
25. int batchSize = 1000;
mysql下载后的初次使用26. try {
27.            t();
28.            connector.subscribe(".*\\..*");
29.            llback();
30. while (true) {
31.                Message message = WithoutAck(batchSize); // 获取指定数量的数据
32. long batchId = Id();
33. int size = Entries().size();
34. if (batchId == -1 || size == 0) {
35. try {
36.                        Thread.sleep(1000);
37.                    } catch (InterruptedException e) {
38.                          e.printStackTrace();
39.                    }
40.                } else {
41.                    Entries());
42.                }
43.
44.                connector.ack(batchId); // 提交确认
45. // llback(batchId); // 处理失败, 回滚数据
46.            }
47.
48.        } finally {
49.            connector.disconnect();
50.        }
51.    }
52.
53. private static void printEntry( List<Entry> entrys) {
54. for (Entry entry : entrys) {
55. if (EntryType() == EntryType.TRANSACTIONBEGIN || EntryType() == EntryType.TRANSACTIONEND) {
56. continue;
57.            }
58.
59.            RowChange rowChage = null;
60. try {
61.                rowChage = RowChange.StoreValue());
62.            } catch (Exception e) {
63. throw new RuntimeException("ERROR ## parser of eromanga-
event has an error , data:" + String(),
64.                        e);
65.            }
66.
67.            EventType eventType = EventType();
68.            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。

发表评论