实时数仓FlinkSQL 之维表join
维表是数仓中的⼀个概念,维表中的维度属性是观察数据的⾓度,在建设离线数仓的时候,通常是将维表与事实表进⾏关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在中,维表通常存储在外部设备中(⽐如MySQL,HBase)。对于每条流式数据,可以关联⼀个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,⽬前Flink SQL的维表JOIN仅⽀持对当前时刻维表快照的关联(处理时间语义),⽽不⽀持事实表rowtime 所对应的的维表快照(事件时间语义)。通过本⽂你可以了解到:如何使⽤Flink SQL创建表如何定义Kafka数据源表如何定义MySQL数据源表什么是Temporal Table Join
维表join的案例
Flink SQL 创建表
注意:本⽂的所有操作都是在Flink SQL cli中进⾏的
创建表的语法
解释
COMPUTED COLUMN(计算列)计算列是⼀个通过column_name AS computed_column_expression ⽣成的虚拟列,产⽣的计算列不是物理存储在数据源表中。⼀个计算列可以通过原有数据源表中的某个字段、运算符及内置函数⽣成。⽐如,定义⼀个消费⾦额的计算列(cost),可以使⽤表的价格(price)*数量(quantity)计算得到。计算列常常被⽤在定义时间属性(见另⼀篇⽂章,可以通过PROCTIME()函数定义处理时间属性,语法为proc AS PROCTIME()。除此之外,计算列可以被⽤作提取事件时间列,因为原始的事件时间可能不是TIMESTAMP(3)类型或者是存在JSON串中。
尖叫提⽰:
1.在源表上定义计算列,是在读取数据源之后计算的,计算列需要跟在SELECT查询语句之后;
2.计算列不能被INSERT语句插⼊数据,在INSERT语句中,只能包括实际的⽬标表的schema,不能包括计算列
⽔位线CREATE  TABLE  [catalog_name .][db_name .]table_name  (    { <column_definition > | <computed_column_definition > }[ , ...n ]    [ <watermark_definition > ]  )  [COMMENT  table_comment ]  [PARTITIONED BY  (partition_column_name1, partition_column_name2, ...)]  WITH  (key1=val1, key2=val2, ...)-- 定义表字段<column_definition >:  column_name column_type [COMMENT  column_comment ]-- 定义计算列<computed_column_definition >:  column_name AS  c
omputed_column_expression [COMMENT  column_comment ]-- 定义⽔位线<watermark_definition >:  WATERMARK FOR  rowtime_column_name AS  watermark_strategy_expression
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
⽔位线定义了表的事件时间属性,其语法为:其中rowtime_column_name 表⽰表中已经存在的事件时间字段,值得注意的是,该事件时间字段必须是TIMESTAMP(3)类型,即形如yyyy-MM-dd HH:mm:ss ,如果不是这种形式的数据类型,需要通过定义计算列进⾏转换。
watermark_strategy_expression 定义了⽔位线⽣成的策略,该表达式的返回数据类型必须是TIMESTAMP(3)类型。Flink提供了许多常⽤的⽔位线⽣成策略:
严格单调递增的⽔位线:语法为即直接使⽤时间时间戳作为⽔位线
递增⽔位线:语法为
乱序⽔位线:语法为
分区
根据具体的字段创建分区表,每⼀个分区会对应⼀个⽂件路径
WITH 选项
创建Table source或者Table sink需要指定表的属性,属性是以key/value的形式配置的,具体参考其相对应的connector
尖叫提⽰:
Note:创建表时指定的表名有三种形式:
(1)catalog_name.db_name.table_name
(2)db_name.table_name
(3)table_name
对于第⼀种形式:会将表注册到⼀个名为‘catalog_name’的catalog以及⼀个名为’db_name’d的数据库的元数据中;
对于第⼆种形式:会将表注册到当前执⾏环境的catalog以及名为‘db_name’的数据库的元数据中;
对于第三种形式:会将表注册到当前执⾏环境的catalog与数据库的元数据中
定义Kafka 数据表
kafka是构建实时数仓常⽤的数据存储设备,使⽤Flink SQL创建kafka数据源表的语法如下:WATERMARK FOR  rowtime_column_name AS  watermark_strategy_expression
1WATERMARK FOR  rowtime_column AS  rowtime_column
1WATERMARK FOR  rowtime_column AS  rowtime_column - INTERVAL  '0.001' SECOND
1WATERMARK FOR  rowtime_column AS  rowtime_column - INTERVAL  'string' timeUnit -- ⽐如,允许5秒的乱序WATERMARK FOR  rowtime_column AS  rowtime_column - INTERVAL  '5' SECOND
1
2
3
尖叫提⽰:
指定具体的偏移量位置:默认是从当前消费者组提交的偏移量开始消费
sink分区:默认是尽可能向更多的分区写数据(每⼀个sink并⾏度实例只向⼀个分区写数据),也可以⾃已分区策略。当使⽤ round-robin分区器
时,可以避免分区不均衡,但是会造成Flink实例与kafka broker之间⼤量的⽹络连接
⼀致性保证:默认sink语义是at-least-once
Kafka 0.10+ 是时间戳:从kafka0.10开始,kafka消息附带⼀个时间戳作为消息的元数据,表⽰记录被写⼊kafka主题的时间,这个时间戳可以作为事件时间属性( rowtime attribute)
**Kafka 0.11+**版本:Flink从1.7开始,⽀持使⽤universal版本作为kafka的连接器 ,可以兼容kafka0.
11之后版本
定义MySQL 数据表  'pe' = 'kafka', -- 连接类型        'connector.version' = '0.11',-- 必选: 可选的kafka 版本有:0.8/0.9/0.10/0.11/universal  'pic' = 'topic_name', -- 必选: 主题名称  't' = 'localhost:2181', -- 必选: zk 连接地址  'connector.properties.bootstrap.servers' = 'localhost:9092', -- 必选: Kafka 连接地址  'up.id' = 'testGroup', --可选: 消费者组  -- 可选:偏移量, earliest-offset/latest-offset/group-offsets/specific-offsets  'connector.startup-mode' = 'earliest-offset',                                            -- 可选: 当偏移量指定为specific offsets ,为指定每个分区指定具体位置  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',  'connector.sink-partitioner' = '...',  -- 可选: sink 分区器,fixed/round-robin/custom  -- 可选: 当⾃定义分区器时,指定分区器的类名  'connector.sink-partitioner-class' = 'pany.MyPartitioner',  'pe' = '...',                -- 必选: 指定格式,⽀持csv/json/avro    -- 指定update-mode ,⽀持append/retract/upsert  'update-mode' = 'append',)
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Temporal Table Join
使⽤语法
注意:⽬前,仅⽀持INNER JOIN与LEFT JOIN。在join的时候需要使⽤ FOR SYSTEM_TIME AS OF ,其中table1.proctime表⽰table1的proctime处理时间属性(计算列)。使⽤FOR SYSTEM_TIME AS OF table1.proctime表⽰当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的数据。
样例
使⽤说明  'pe' = 'jdbc', -- 必选: jdbc ⽅式  'connector.url' = 'jdbc:mysql://localhost:3306/flin
k-test', -- 必选: JDBC url  'connector.table' = 'jdbc_table_name',  -- 必选: 表名  -- 可选: JDBC driver ,如果不配置,会⾃动通过url 提取  'connector.driver' = 'sql.jdbc.Driver',                                            'connector.username' = 'name', -- 可选: 数据库⽤户名  'connector.password' = 'password',-- 可选: 数据库密码    -- 可选, 将输⼊进⾏分区的字段名.  'lumn' = 'column_name',    -- 可选, 分区数量.  'ad.partition.num' = '50',    -- 可选, 第⼀个分区的最⼩值.  'ad.partition.lower-bound' = '500',    -- 可选, 最后⼀个分区的最⼤值  'ad.partition.upper-bound' = '1000',    -- 可选, ⼀次提取数据的⾏数,默认为0,表⽰忽略此配置  'ad.fetch-size' = '100',    -- 可选, lookup 缓存数据的最⼤⾏数,如果超过改配置,⽼的数据会被清除  'connector.lookup.cache.max-rows' = '5000',    -- 可选,lookup 缓存存活的最⼤时间,超过该时间旧数据会过时,注意cache.max-rows 与l 必须同时配置  'connector.l' = '10s',    -- 可选, 查询数据最⼤重试次数  'connector.lookup.max-retries' = '3',    -- 可选,写数据最⼤的flush ⾏数,默认5000,超过改配置,会触发刷数据  'connector.write.flush.max-rows' = '5000',    --可选,flush 数据的间隔时间,超过该时间,会通过⼀个异步线程flush 数据,默认是0s  'connector.write.flush.interval' = '2s',  -- 可选, 写数据失败最⼤重试次数  'connector.write.max-retries' = '3' )
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33SELECT  column -names FROM  table1  [AS  <alias1>][LEFT ] JOIN  table2 FOR  SYSTEM_TIME AS  OF  table1.proctime [AS  <alias2>]ON  lumn -name1 = table2.key -name1
1
2
3
4SELECT  o .amout , o .currency , r .rate , o .amount * r .rate FROM  Orders AS  o  JOIN  LatestRates FOR  SYSTEM_TIME AS  OF  o .proctime AS  r  ON  r .currency = o .currency
1
2
3
4
5
6
7
仅⽀持Blink planner 仅⽀持SQL,⽬前不⽀持Table API ⽬前不⽀持基于事件时间(event time)的temporal table join 维表可能会不断变化,JOIN⾏为发⽣后,维表中的数据发⽣了变化(新增、更新或删除),则已关联的维表数据不会被同步变化维表和维表不能进⾏JOIN
维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件
维表Join 案例
背景
Kafka中有⼀份⽤户⾏为数据,包括pv,buy,cart,fav⾏为;MySQL中有⼀份省份区域的维表数据。现将两种表进⾏JOIN,统计每个区域的购买⾏为数量。
步骤
维表存储在MySQL中,如下创建维表数据源:
事实表存储在kafka中,数据为⽤户点击⾏为,格式为JSON,具体数据样例如下:
创建kafka数据源表,如下:CREATE  TABLE  dim_province (    province_id BIGINT ,  -- 省份id    province_name  VARCHAR , -- 省份名称 region_name VARCHAR  -- 区域名称) WITH  (    'pe' = 'jdbc',    'connector.url' = 'jdbc:mysql://192.168.10.203:3306/mydw',    'connector.table' = 'dim_province',    'connector.driver' = 'sql.jdbc.Driver',    'connector.username' = 'root',    'connector.password' = '123qwe',    'connector.lookup.cache.max-rows' = '5000',    'connector.l' = '10min');
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
connect和join的区别
1
2
3

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。