FlinkCDC将MySQL的数据写⼊Hudi实践
Flink CDC + Hudi实践
⼀、依赖关系
为了设置MySQL CDC连接器,下表提供了使⽤构建⾃动化⼯具(例如Maven或SBT)和带有SQL JAR捆绑包的SQL Client的两个项⽬的依赖项信息。
1、Maven依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
2、SQL客户端JAR
下载flink-sql-connector-mysql-cdc-2.0.0.jar并将其放在下<FLINK_HOME>/lib/。
⼆、设置MySQL服务器
您必须定义⼀个对Debezium MySQL连接器监视的所有数据库具有适当权限的MySQL⽤户。
1、创建MySQL⽤户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
2、向⽤户授予所需的权限:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
3、最终确定⽤户的权限:
mysql> FLUSH PRIVILEGES;
1、MySQL CDC源代码如何⼯作
启动MySQL CDC源时,它将获取⼀个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻⽌其他数据库的写⼊。然后,它读取当前binlog位置以及数据库和表的schema。之后,将释放 全局读取锁。然后,它扫描数据库表并从先前记录的位置读取binlog。Flink将定期执⾏checkpoints以记录binlog位置。如果发⽣故障,作业将重新启动并从checkpoint完成的binlog位置恢复。因此,它保证了仅⼀次的语义。
2、向MySQL⽤户授予RELOAD权限
如果未授予MySQL⽤户RELOAD权限,则MySQL CDC源将改为使⽤表级锁,并使⽤此⽅法执⾏快照。这会阻⽌写⼊更长的时间。
3、全局读取锁(FLUSH TABLES WITH READ LOCK)
全局读取锁 在读取binlog位置和schema期间保持。这可能需要⼏秒钟,具体取决于表的数量。全局读取锁定会阻⽌写⼊,因此它仍然可能影响在线业务。
如果要跳过读取锁,并且可以容忍⾄少⼀次语义,则可以添加'debezium.de' = 'none'选项以跳过锁。
4、为每个作业设置⼀个differnet SERVER ID
每个⽤于读取binlog的MySQL数据库客户端都应具有唯⼀的ID,称为server id。MySQL服务器将使⽤此ID维护⽹络连接和binlog位置。如果不同的作业共享相同的server id,则可能导致从错误的binlog位置进⾏读取。
提⽰:默认情况下,启动TaskManager时,server id是随机的。如果TaskManager失败,则再次启动时,它可能具有不同的server id。但这不应该经常发⽣(作业异常不会重新启动TaskManager),也不会对MySQL服务器造成太⼤影响。
因此,建议为每个作业设置不同的server id ,例如:
通过: SELECT * FROM source_table /+ OPTIONS('server-id'='123456') / ;
通过Stream ApI的 创建source时设置: ** MySQLSource.builder().xxxxxx.serverId(123456);**
重点:Mysq的binlog 可以说是针对库级别,所以相同的server id去拉⼀个库⾥的不同表或者相同表可能会造成数据丢失。所以建议设置server id。(我也有在社区邮件中提问Jark⼤佬:)
5、扫描数据库表期间⽆法执⾏检查点
在扫描表期间,由于没有可恢复的位置,因此我们⽆法执⾏checkpoints。为了不执⾏检查点,MySQL
CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很⼤,则建议添加以下Flink 配置,以避免由于超时检查点⽽导致故障转移:
execution.checkpointing.interval: 10min
lerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
6、设置MySQL会话超时
为⼤型数据库创建初始⼀致的快照时,在读取表时,您建⽴的连接可能会超时。您可以通过在MySQL配置⽂件中配置
Interactive_timeout和wait_timeout来防⽌此⾏为。
interactive_timeout:服务器在关闭交互式连接之前等待活动的秒数。参见。
wait_timeout:服务器在关闭⾮交互式连接之前等待其活动的秒数。参见。
四.Flink cdc 数据⼊湖(Hudi)
1.创建mysql源表
-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`consignee` varchar(100) DEFAULT NULL COMMENT '收货⼈',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件⼈电话',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '总⾦额',
`order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表⽰下单,2表⽰⽀付',
`user_id` bigint(20) DEFAULT NULL COMMENT '⽤户id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款⽅式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三⽅⽀付⽤)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三⽅⽀付⽤)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`operate_time` datetime DEFAULT NULL COMMENT '操作时间',
`expire_time` datetime DEFAULT NULL COMMENT '失效时间',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '⽗订单编号',
`img_url` varchar(200) DEFAULT NULL COMMENT '图⽚路径',
`province_id` int(20) DEFAULT NULL COMMENT '地区',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
-- ----------------------------
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info`
VALUES (476, 'lAXjcL', '134********', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '134********', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NU INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '133********', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NUL INSERT INTO `order_info`
VALUES (480, 'lAXjcL', '134********', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, INSERT INTO `order_info`
VALUES (481, 'QLiFDb', '134********', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NU INSERT INTO `order_info`
VALUES (482, 'iwKjQD', '133********', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NUL INSERT INTO `order_info`
VALUES (483, 'lAXjcL', '134********', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, INSERT INTO `order_info`
VALUES (484, 'QLiFDb', '134********', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NU INSERT INTO `order_info`
VALUES (485, 'iwKjQD', '133********', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NUL INSERT INTO `order_info`
VALUES (486, 'lAXjcL', '134********', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, INSERT INTO `order_info`
VALUES (487, 'QLiFDb', '134********', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NU INSERT INTO `order_info`
VALUES (488, 'iwKjQD', '133********', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NUL INSERT INTO `order_info`
VALUES (489, 'lAXjcL', '134********', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, INSERT INTO `order_info`
VALUES (490, 'QLiFDb', '134********', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NU INSERT INTO `order_info`
VALUES (491, 'iwKjQD', '133********', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NUL
/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '图⽚名称(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';
-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail`
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰⾊移动联通电信4G⼿机双卡双待', 'XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvE INSERT INTO `order_detail`
VALUES (1330, 477, 9, '荣耀10 GT游戏加速 AIS⼿持夜景 6GB+64GB 幻影蓝全⽹通移动联通电信', 'ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, INSERT INTO `order_detail`
VALUES (1331, 478, 4, '⼩⽶Play 流光渐变AI双摄 4GB+64GB 梦幻蓝全⽹通4G 双卡双待⼩⽔滴全⾯屏拍照游戏智能⼿机', 'RqfEFnAOqnqRnNZLFRvBuwXxw INSERT INTO `order_detail`
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰⾊移动联通电信4G⼿机双卡双待', 'IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 890 INSERT INTO `order_detail`
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰⾊移动联通电信4G⼿机双卡双待', 'bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCD  2.使⽤Flink cdc mysql连接器创建flinkSQL映射表
CREATE TABLE order_info(
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,    user_id BIGINT,
create_time TIMESTAMP(0),
operate_time TIMESTAMP(0),
province_id INT,
order_status STRING,
total_amount DECIMAL(10, 5)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.88.161',
'port' = '3306',
'username' = 'user',
'password' = 'password',
'database-name' = 'bigdata',
'table-name' = 'order_info',
'server-id' = '5401',
'scan.abled'='false'
);
sult-mode=tableau;
select * from  order_info;
CREATE TABLE order_detail(
id BIGINT,
order_id BIGINT,
sku_id BIGINT,
sku_name STRING,
sku_num BIGINT,
order_price DECIMAL(10, 5),
create_time TIMESTAMP(0)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.88.161',
'port' = '3306',
'username' = 'user',
'password' = 'password',
'database-name' = 'bigdata',
'table-name' = 'order_detail',
'server-id' = '5402',
'scan.abled'='false'
);
select * from  order_detail;
3.创建FlinkSQL Hudi连接器创建hudi表
CREATE TABLE order_info_hudi(
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
user_id BIGINT,
create_time TIMESTAMP(0),
operate_time TIMESTAMP(0),
province_id INT,
order_status STRING,
total_amount DECIMAL(10, 5)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/order_info_hudi',
'pe' = 'MERGE_ON_READ',
'abled' = 'true',
'write.precombine.field' = 'create_time',
'abled' = 'false'
);
CREATE TABLE order_detail_hudi(
id BIGINT,
order_id BIGINT,
sku_id BIGINT,
sku_name STRING,
sku_num BIGINT,
order_price DECIMAL(10, 5),
create_time TIMESTAMP(0)mysql下载链接
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/order_detail_hudi',
'pe' = 'MERGE_ON_READ',
'abled' = 'true',
'write.precombine.field' = 'create_time',
'hoodie.dkey.field' = 'id',
'abled' = 'false'
);
4.将数据从CDC表插⼊hudi表
insert into order_info_hudi select * from order_info;
select * from hoodie_table/*+ OPTIONS('abled'='true')*/;
insert into order_detail_hudi select * from order_detail;
select * from order_detail;
注意事项:
同时可以查看HDFS⾥的Hudi数据路径,这⾥需要等Flink 5次checkpoint(默认配置可修改)之后才能查看到这些⽬录,⼀开始只有.hoodie⼀个⽂件夹
5.直接⽤hudi表进⾏join操作

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。