Flink 实时读取Mysql 增量⽇志数据并写⼊GreenPlumMysql
FlinkStreamETL
0.功能说明
概括:利⽤Flink实时统计Mysql数据库BinLog⽇志数据,并将流式数据注册为流表,利⽤Flink SQL将流表与Mysql的维表进⾏JOIN,最后将计算结果实时写⼊Greenplum/Mysql。
文豆php1.需求分析
1.1需求实时统计各个地区会议室的空置率,预定率,并在前端看板上实时展⽰。源系统的数据库是Mysql ,它有三张表,分别是:t_meeting_info(会议室预定信息表)、t_meeting_location(属地表,表)、t_meeting_address(会议室属地表,维度表)。
1.2说明
t_meeting_info 表中的数据每时每刻都在更新数据,若通过JDBC ⽅式定时查询Mysql ,会给源系统数据库造成⼤量⽆形的压⼒,甚⾄会影响正常业务的使⽤,并且时效性也不⾼。需要在基本不影响Mysql 正常使⽤的情况下完成对增量数据的处理。
上⾯三张表的DDL 语句如下:
mysql面试题csdnt_meeting_info(会议室预定信息表,这张表数据会实时更新)CREATE TABLE `t_meeting_info ` ( `id ` int (11) NOT NULL AUTO_INCREMENT COMMENT '主键id', `meeting_code ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '会议业务唯⼀编号', `msite ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议名称', `mcontent ` varchar (4096) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议内容', `attend_count ` int (5) DEFAULT NULL COMMENT '参会⼈数', `type ` int (5) DEFAULT NULL COMMENT '会议类型 1 普通会议 2 融合会议 3 视频会议 4 电话会议', `status ` int (255) DEFAULT NULL COMMENT '会议状态 ', `address_id ` int (11) DEFAULT NULL COMMENT '会议室id', `email ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建⼈邮箱', `contact_tel ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '', `create_user_name ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建⼈姓名', `create_user_id ` varchar (100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建⼈⼯号', `creator_org ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建⼈组织', `m
start_date ` datetime DEFAULT NULL COMMENT '会议开始时间', `mend_date ` datetime DEFAULT NULL COMMENT '会议结束时间', `create_time ` datetime DEFAULT NULL COMMENT '创建时间', `update_user ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新⼈', `update_time ` datetime DEFAULT NULL COMMENT '更新时间', `company ` int (10) DEFAULT NULL COMMENT '会议所在属地code', `sign_status ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '预留字段', PRIMARY KEY (`id `) USING BTREE , KEY `t_meeting_info_meeting_code_index ` (`meeting_code `) USING BTREE , KEY `t_meeting_info_address_id_index ` (`address_id `) USING BTREE , KEY `t_meeting_info_create_user_id_index ` (`create_user_id `)) ENGINE =InnoDB AUTO_INCREMENT =65216 DEFAULT CHARSET =utf8 ROW_FORMAT =DYNAMIC COMMENT ='会议主表';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
js幻灯片轮播19
20
21
22
23
24
25
26
t_meeting_location(属地表,地区维表)
t_meeting_address(会议室属地表,会议室维表)
2.实现⽅案⽅案如下图所⽰:利⽤Canal监听Mysql数据库的增量BinLog⽇志数据(JSON格式)将增量⽇志数据作为Kafka的⽣产者,Flink解析Kafka的Topic 中的数据并消费将计算后的流式数据(Stream)注册为Flink 中的表(Table)
最后利⽤Flink与t_meeting_location、t_meeting_address维表进⾏JOIN,将最终的结果写⼊数据库。CREATE TABLE `t_meeting_location ` ( `id ` int (11) NOT NULL AUTO_INCREMENT , `short_name ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地简称', `full_name ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地全称', `code ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '属地code', `region_id ` int (11) DEFAULT NULL COMMENT '地区id', `create_user ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建⼈', `update_user ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新⼈', `create_time ` datetime DEFAULT NULL COMMENT '创建时间', `update_time ` datetime DEFAULT NULL CO
MMENT '更新时间', PRIMARY KEY (`id `) USING BTREE , UNIQUE KEY `t_meeting_location_code_uindex ` (`code `) USING BTREE ) ENGINE =InnoDB AUTO_INCREMENT =103 DEFAULT CHARSET =utf8 ROW_FORMAT =DYNAMIC COMMENT ='属地表';1
2
3
特效相机下载安装4
5
6
7
8
9
10
11
12
13CREATE TABLE `t_meeting_address ` ( `id ` int (11) NOT NULL AUTO_INCREMENT COMMENT '主键id', `name ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室名称', `location ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '所在属地', `shared ` int (3) DEFAULT NULL COMMENT '是否共享 0 默认不共享 1 全部共享 2 选择性共享', `cost ` int (10) DEFAULT NULL COMMENT '每⼩时成本', `size ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室容量⼤⼩', `bvm_ip ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT 'BVM IP', `type ` int (2) DEFAULT NULL COMMENT '会议室类型 1 普通会议室 2 视频会议室', `create_time ` datetime DEFAULT NULL COMMENT '创建时间', `create_user ` varchar (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建⼈', `update_time ` datetime DEFAULT NULL COMMENT '更新时间', `update_user ` varchar (255) CHARACTER SE
T utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新⼈', `status ` int (2) DEFAULT NULL COMMENT '是否启⽤ ,0 未启⽤ 1已启⽤ 2已删除', `order ` int (5) DEFAULT NULL COMMENT '排序', `approve ` int (2) DEFAULT NULL COMMENT '是否审批 0 不审批 1 审批', PRIMARY KEY (`id `) USING BTREE , KEY `t_meeting_address_location_index ` (`location `) USING BTREE , KEY `order ` (`order `) USING BTREE ) ENGINE =InnoDB AUTO_INCREMENT =554 DEFAULT CHARSET =utf8 ROW_FORMAT =DYNAMIC COMMENT ='会议室表';
1
2
3
4
5
return0不写会怎么样6
7
8
9
10
11
12
13
14
15
16
17
ligerui树配置18
19
20
需要服务器:CentOS7,JDK8、Scala 2.12.6、Mysql、Canal、Flink1.9、Zookkeeper、Kafka 3.可视化⽅案
Tableau实时刷新Greenplum,FineBI也可以(秒级)
DataV也可以每⼏秒刷新⼀次
Flink计算后的结果,写⼊到缓存,前端开发可视化组件进⾏展⽰(实时展⽰)。
4.项⽬地址
由于CSDN不⽅便粘贴图⽚,详细内容请见:
5.参考⽬录
[1].
[2].
[3].
[4].
[5].
[6].
[7].
[8].
[9].
[10].Flink` 流与维表的关联
[11].Flink DataStream流表与维表Join(Async` I/O)
12. `flink 流表join mysql表
13. `flink1.9 使⽤LookupableTableSource实现异步维表关联
14. Flink异步之⽭盾-锋利的Async I/O
15.Flink 的时间属性及原理解析
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论