监听mysqlbin⽂件,使⽤Binlog+FlinkCDC实时监控数据⼀、MySql的Binlog
1、什么是Binlog
1)binlog是⼆进制⽇志,并且是事务安全性
2)binlog记录了所有的DDL和DML(除了数据查询语句)语句,并以事件的形式记录,还包含语句所执⾏的消耗的时间
3)⼀般来说开启⼆进制⽇志⼤概会有1%的性能损耗。
2、Binlog使⽤场景
1)使⽤binlog恢复数据
2)在项⽬中动态监听mysql中变化的数据
3、Binlog开启
1)在MySQL的配置⽂件(Linux: /etc/myf , Windows:\my.ini)下,修改配置在[mysqld] 区块设置/添加server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2019
binlog-do-db=gmall2020
binlog-do-db=gmall2021
2)重启mysqlsudo systemctl restart mysqld
4、配置⽂件参数解析
配置机器id
多台机器不能重复server-id=1
开启binloglog-bin=mysql-bin
Binlog分类设置
MySQL Binlog的格式,那就是有三种,分别是STATEMENT,MIXED,ROW。
在配置⽂件中选择配置,⼀般会配置为rowbinlog_format=row
三种分类的区别:
1)statement
语句级,binlog会记录每次⼀执⾏写操作的语句。
相对row模式节省空间,但是可能产⽣不⼀致性,⽐如
update tt set create_date=now()
如果⽤binlog⽇志进⾏恢复,由于执⾏时间不同可能产⽣的数据就不同。
优点:节省空间
缺点:有可能造成数据不⼀致。
2)row(常⽤)
⾏级,binlog会记录每次操作后每⾏记录的变化。
优点:保持数据的绝对⼀致性。因为不管sql是什么,引⽤了什么函数,他只记录执⾏后的效果。
缺点:占⽤较⼤空间。
3)mixed
statement的升级版,⼀定程度上解决了,因为⼀些情况⽽造成的statement模式不⼀致问题在某些情况下譬如:
当函数中包含 UUID() 时;包含 AUTO_INCREMENT 字段的表被更新时;
执⾏ INSERT DELAYED 语句时;
⽤ UDF 时;
会按照 ROW的⽅式进⾏处理
优点:节省空间,同时兼顾了⼀定的⼀致性。
缺点:还有些极个别情况依旧会造成不⼀致,
另外statement和mixed对于需要对binlog的监控的情况都不⽅便。
设置数据库
设置要监听的数据库,可以同时写⼊多个库binlog-do-db=gmall2021
binlog-do-db=gmall2022
binlog-do-db=gmall2023
⼆、FlinkCDC
1、什么是CDC
CDC是Change Data Capture(变更数据获取)的简称。核⼼思想是,监测并捕获数据库的变动(包括数据或数据表的插⼊、更新以及删除等),将这些变更按发⽣的顺序完整记录下来,写⼊到消息中间件中以供其他服务进⾏订阅及消费。
2、CDC的种类
CDC主要分为基于查询和基于Binlog两种⽅式,我们主要了解⼀下这两种之间的区别:基于查询的CD
C基于Binlog的CDC开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执⾏模式BatchStreaming
是否可以捕获所有数据变化否是
延迟性⾼延迟低延迟
是否增加数据库压⼒是否
3、FlinkCDC
Flink内置了Debezium
FlinkCDC1.11版本正式发布
Canal不⽀持读取全量binlog数据,⽽FlinkCDC完美避开了这个问题
3.CDC案例实操
1)导⼊依赖
org.apache.flink
flink-java
1.1
2.0
org.apache.flink
flink-streaming-java_2.12
1.1
2.0
org.apache.flink
flink-clients_2.12
1.1
2.0
org.apache.hadoop
hadoop-client
3.1.3
mysql
mysql-connector-java
5.1.49
com.alibaba.ververica
flink-connector-mysql-cdc
1.2.0
com.alibaba
fastjson
1.2.75
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
2)编写代码package com.haoziqi;
import com.alibaba.sql.MySQLSource;
import com.alibaba.sql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.startstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.vironment.CheckpointConfig;
import org.apache.flink.vironment.StreamExecutionEnvironment;
import java.util.Properties;
/**************************************************************
* @Author: haoziqi
* @Date: Created in 9:27 2021/3/15
* @Description: TODO 使⽤DataStream连接mysql,并监控表中新增的数据 测试通道是否正常:flink读取mysql binlog数据
* 执⾏的时候需要检查对应的库是否存在
* linux中:sudo vim /etc/myf
* 2、执⾏的时候需要运⾏hdfs
* 3、启动mysql,
*
**************************************************************/
public class FlinkCDC1 {
private static Properties properties;
public static void main(String[] args) throws Exception {
//TODO 1.获取流处理执⾏环境
StreamExecutionEnvironment env = ExecutionEnvironment();
env.setParallelism(1);
//1.1Checkpoint相关
/*读取的是binlog中的数据,如果集挂掉,尽量能实现断点续传功能。如果从最新的读取(丢数据)。如果从最开始读(重复数据)。理想状
态:读取binlog中的数据读⼀⾏,保存⼀次读取到的(读取到的⾏)位置信息。⽽flink中读取⾏位置信息保存在Checkpoint中。使⽤Checkpoint可以把flink中读取(按⾏)的位置信息保存在Checkpoint中*/
mysql下载jar包//设置Checkpoint的模式:精准⼀次
//任务挂掉的时候是否清理checkpoint。使任务正常退出时不删除CK内容,有助于任务恢复。默认的是取消的时候清空checkpoint中的数据。RETAIN_ON_CANCELLATION表⽰取消任务的时候,保存最后⼀次的checkpoint。便于任务的重启和恢复,正常情况下都使⽤RETAIN
//设置⼀个重启策略:默认的固定延时重启次数,重启的次数是Integer的最⼤值,重启的间隔是1s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//设置⼀个状态后端 jobManager。如果使⽤的yarn集模式,jobManager随着任务的⽣成⽽⽣成,任务挂了jobManager就没了。因此
需要启动⼀个状态后端。只要设置checkpoint,尽量就设置⼀个状态后端。保存在各个节点都能读取的位置:hdfs中
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/ck/"));
//指定⽤户
System.setProperty("HADOOP_USER_NAME", "atguigu");
//TODO 2.读取mysql变化数据 监控MySQL中变化的数据
Properties properties = new Properties(); //创建⼀个变量可以添加之后想添加的配置信息
DebeziumSourceFunction sourceFunction = MySQLSource.builder() //使⽤builder创建MySQLsource对象,需要指定对象的泛型。
.hostname("hadoop102") //指定监控的哪台服务器(MySQL安装的位置)
.port(3306) //MySQL连接的端⼝号
.username("root") //⽤户
.password("123456")//密码
.databaseList("gmall_flink_0923") //list:可以监控多个库
.tableList("gmall_flink_0923.z_user_info") //如果不写则监控库下的所有表,需要使⽤【库名.表名】
//debezium中有很多配置信息。可以创建⼀个对象来接收
//.debeziumProperties(properties)
.deserializer(new StringDebeziumDeserializationSchema()) //读的数据是binlog⽂件,反序列化器,解析数据
.startupOptions(StartupOptions.initial()) //初始化数据:空值读不读数据库中的历史数据。initial(历史+连接之后的)、latest-
offset(连接之后的)。timestamp(根据指定时间戳作为开始读取的位置)
.build();
DataStreamSource streamSource = env.addSource(sourceFunction);
//TODO 3.打印数据
streamSource.print();
//把上⾯代码注释掉,报错代码
SingleOutputStreamOperator map = streamSource.map(data -> data);
SingleOutputStreamOperator slotgroup = map.slotSharingGroup("123");
slotgroup.print();
//TODO 4.启动任务
}
}
3) 案例测试:
1)打包成带依赖的jar包
2)开启MySQLbinlog并重启Mysql
4) 启动HDFS集+yarnstart-yarn.sh
start-dfs.sh

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。