Mysql实时同步到⼤数据数仓
如何能够做到数据的实时同步呢?我们想到了MySQL主从复制时使⽤的binlog⽇志,它记录了所有的 DDL 和 DML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执⾏的消耗时间下⾯来看⼀下MySQL主从复制的原理,主要有以下⼏个步骤:
1. master(主库)在每次准备提交事务完成数据更新前,将改变记录到⼆进制⽇志(binary log)中
2. slave(从库)发起连接,连接到master,请求获取指定位置的binlog⽂件
3. master创建dump线程,推送binlog的slave
4. slave启动⼀个I/O线程来读取主库上binary log中的事件,并记录到slave⾃⼰的中继⽇志(relay log)中
5. slave还会起动⼀个SQL线程,该线程从relay log中读取事件并在备库执⾏,完成数据同步
6. slave记录⾃⼰的binlog
7.
binlog记录了Mysql数据的实时变化,是数据同步的基础,服务需要做的就是遵守Mysql的协议,将⾃⼰伪装成Mysql的slave来监听业务从库,完成数据实时同步。 MySQL默认没有开启使⽤binlog,且mac安装默认没有myf⽂件,因此需要⾃⼰在/etc⽬录下新建⽂件并添加相应的配置
⼀、可以查看本地MySQL的binlog开启情况
# 登录
mysql -uroot -p
# 查询binlog开启情况
show variables like 'log_bin’;
查询结果如下,是未开启的
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
⼆、添加myf配置⽂件,内容如下
[mysqld]
#log_bin
log-bin = mysql-bin #开启binlog,表⽰mysql的binlog是打开的,并且使⽤了默认路径
binlog-format = ROW #选择row模式
server_id = 1 #配置mysql replication需要定义,不能和canal的slaveId重复
然后通过系统偏好设置重启MySQL,再使⽤show variables like 'log_bin';查看开启情况,OK,已经开启
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)
三、查看⽇志⽂件
mysql > show master logs;
+------------------+------------+
| Log_name | File_size |
+------------------+------------+
| mysql-bin.000001 | 177 |
| mysql-bin.000002 | 479 |
| mysql-bin.000003 | 2975 |
注:以上就是⽇志⽂件的名字等信息。
四、查看默认路径:
mysql > show variables like 'log_%';
| Variable_name | Value |
+----------------------------------------+----------------------------------------+
| log_bin | ON |
| log_bin_basename | /usr/local/mysql/data/mysql-bin |
| log_bin_index | /usr/local/mysql/data/mysql-bin.index
注:可以看出,我们的log⽂件是存储在/usr/local/mysql/data下的
五、查看⽇志⽂件
执⾏可进⾏查看⽇志:
mysqlbinlog --no-defaults mysql-bin.000001
注:如果不加--no-defaults,会提⽰错误:mysqlbinlog: [ERROR] unknown variable 'default-character-set=utf8',应该是字符集的问题。
六、下载并部署Maxwell
wget github/zendesk/maxwell/releases/download/v1.14.1/maxwell-1.14.
tar -zxvf maxwell-1.14.
七、配置mysql ⽤户并配置访问权限
mysql> create database maxwell;
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> flush privileges;
⼋、启动maxwell (kafka 服务)
bin/maxwell --user='maxwell' --password='hge123!@#' --host='huogenet' --producer=kafka --kafka.bootstrap.servers=devbox3:9092,devbox4:9092,devbox
如果kafka 版本与⾃⼰集版本不⼀致,需要执⾏以下步骤:
kafka client 对应的jar copy 到 maxwell-1.14.1/lib/kafka-clients ⽬录下
cp kafka-clients-0.10.2-kafka-2.2.0.jar /home/bigdata/maxwell/maxwell-1.14.1/lib/kafka-clients/
使⽤以下命令启动
bin/maxwell --user='maxwell' --password='hge123!@#' --host='huogenet' --producer=kafka --kafka.bootstrap.servers=devbox3:9092,devbox4:9092,devbox
九、启动kafka消费
kafka-console-consumer --bootstrap-server devbox3:9092 --topic test
kafka ⽇志⽇志如下:
insert 操作
{
"database": "test",
"table": "users",
"type": "insert",
"ts": 1573813955,
"xid": 519,
"commit": true,
"data": {
"id": 4,
"user_name": "⼩王2",
"emails": "aa@email,bb@email,cc@email" }
}
delete 操作
{
"database": "test",
"table": "bb_user",
"type": "delete",
"ts": 1573814395,
"xid": 725,
"commit": true,
"data": {
"id": 1,
"username": "4",
"nickname": "5",
"password": "6",
"avatar": "7",
"updatetime": "2019-10-10 18:44:06",
"timestamp": "2019-10-10 18:44:09"
}
}
update 操作
{
"database": "test",
"table": "bb_user",
"type": "update",
"ts": 1573814047,
"xid": 604,
"commit": true,
"data": {
"id": 1,
"username": "4",
"nickname": "5",
"password": "6",
"avatar": "7",
"updatetime": "2019-10-10 18:44:06",
"timestamp": "2019-10-10 18:44:09"
},
"old": {
"username": "2",
"nickname": "3",
"password": "4",
"avatar": "5"
}
}
drop table 操作
{
"type": "table-drop",
"database": "test",
"table": "bb_user",
"ts": 1573814543000,
"sql": "DROP TABLE `bb_user` /* generated by server */"mysql下载jar包
}
create table 操作
{
"type": "table-create",
"database": "test",
"table": "ppp_copy1",
"def": {
"database": "test",
"charset": "utf8",
"table": "ppp_copy1",
"primary-key": [],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "varchar",
"name": "pname",
"charset": "utf8"
},
{
"type": "varchar",
"name": "subject",
"charset": "utf8"
},
{
"type": "int",
"name": "points",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "int",
"name": "sex",
"signed": true
}
]
},
"ts": 1573814589000,
"sql": "CREATE TABLE `ppp_copy1` (\n `id` int(11) DEFAULT NULL,\n `pname` varchar(255) DEFAULT NULL,\n `subject` varchar(255) DEFAULT NULL,\n }
⼗、如何保障数据正确性
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论