FlinkCDC读取MySQL的例⼦
前⾯转载了Flink CDC的机制⽂章,原理看起来还是⽐较好理解的,那么接下来很多⼈肯定都跃跃欲试了。
这篇⽂章分享⼀下MySQL作为源端,使⽤Flink SQL如何读取MySQL数据。
注意:如果经验有限,在进⾏所有的软件安装配置操作之前,请先仔细阅读⼀遍,⽬的主要是从⽂档中获取到Flink CDC与Flink版本对照关系及其他软件的⽀持版本。
本⽂章使⽤的软件版本如下:
flink-1.13.3
flink-sql-connector-mysql-cdc-2.0.2.jar
10.3.29-MariaDB-log
openjdk version "11.0.11" 2021-04-20
1.MySQL配置
在进⾏配置之前,⾸先你需要⾃⾏安装MySQL,具体步骤这⾥不展开说了,⼤家如果安装MySQL有困难还请⾃⼰去百度⼀下。
当前Flink CDC官⽅宣称⽀持的MySQL版本信息如下:
Database: 5.7, 8.0.x
JDBC Driver: 8.0.16
我这边测试过程中使⽤的是MariaDB 10,也是⽀持的。
1.1 启⽤MySQL binlog
修改myf⽂件,增加如下信息:
server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
binlog_do_db=db_a
binlog_do_db=db_b
配置项的解释如下:
server_id:MySQL5.7及以上版本开启binlog必须要配置这个选项。对于MySQL集,不同节点的server_id必须不同。对于单实例部署则没有要求。
log_bin:指定binlog⽂件名和储存位置。如果不指定路径,默认位置为/var/lib/mysql/。
binlog_format:binlog格式。有3个值可以选择:ROW:记录哪条数据被修改和修改之后的数据,会产⽣⼤量⽇志。
STATEMENT:记录修改数据的SQL,⽇志量较⼩。MIXED:混合使⽤上述两个模式。CDC要求必须配置为ROW。
expire_logs_days:bin_log过期时间,超过该时间的log会⾃动删除。
binlog_do_db:binlog记录哪些数据库。如果需要配置多个库,如例⼦中配置多项。切勿使⽤逗号分隔。
配置⽂件修改完毕后保存并重启MySQL。然后进⼊MySQL命令⾏,验证是否已启⽤binlog:
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
返回结果如果显⽰ON则表⽰binlog启⽤。
1.2 初始化MySQL表
在MySQL中创建测试数据库和测试表demo.products,语句如下:
create database demo;
use demo;
CREATE TABLE `products` (
`id` int(11) NOT NULL,
`name` varchar(20) DEFAULT NULL,
`description` varchar(20) DEFAULT NULL,
`weight` decimal(10,3) DEFAULT NULL,
PRIMARY KEY (`id`)
) ;
1.3 向MySQL表中插⼊测试数据
INSERT INTO demo.products
(id, name, description, weight)
VALUES(1, 'a', 'a', 20.000);
INSERT INTO demo.products
(id, name, description, weight)
VALUES(2, 'b', 'b', 30.000);
INSERT INTO demo.products
(id, name, description, weight)
VALUES(3, 'c', 'c', 40.000);
INSERT INTO demo.products
(id, name, description, weight)
VALUES(4, 'd', 'd', 50.000);
2.Flink配置
2.1 Flink下载
到下载flink-1.13.3软件包,我这⾥下载的版本如下:
flink-1.13.3-bin-scala_
将下载的软件包解压缩到你习惯使⽤的软件⽬录,我这⾥就直接放在Downloads⽬录下了。
2.2 下载mysql-cdc jar
到flink cdc的下载最新的jar包flink-sql-connector-mysql-cdc-2.0.2.jar。
将下载好的jar包,放到2.1步解压后flink⽂件夹的lib⽬录下。
2.3 启动Flink
进⼊flink/bin⽬录,执⾏[./start-cluster.sh]启动flink测试环境。
3.在Flink SQL中读取MySQL
进⼊flink/bin⽬录,执⾏[./sql-client.sh]启动Flink SQL。3.1 创建Flink SQL 表
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.200',
'port' = '3306',
'username' = 'root',
mysql下载之后是个文件夹
'password' = 'passwd',
'database-name' = 'demo',
'table-name' = 'products'
);
3.2 执⾏查询
SELECT id, UPPER(name), description, weight FROM mysql_binlog;

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。