FlinkSql教程(5)
Flink 双流Join
概述
在之前的Flink教程03⾥⾯给⼤家讲过了维表Join,今天来和⼤家分享⼀下双流Join
⽬前Flink双流Join分成两类:UnBounded Join 和 Time Interval Join
在有些场景下,⽤哪个都⾏,不过后者的性能会优于前者,⽽且如果在双流Join之后想要再进⾏窗⼝计算,那么只能使⽤Time Interval Join,⽬前的UnBounded Join后⾯是没有办法再进⾏Event Time的窗⼝计算
UnBounded Join
分为两种Join,⼀种是Inner Join,另⼀种是Outer Join
Inner Join:双流Join最⼤的问题是两边的数据量是不⼀样的,会存在⼀条流中的数据已经到达,⽽另⼀条流中与之匹配的数据还未到达的情况,那么Flink是如何解决这个问题的呢?举个例⼦:假设左表先来
了3条数据,Join 的key分别是1、2、3,右表中尚未有数据到达,那么Flink 会将左表的那三条数据缓存在Join节点的state中,同时不会有数据下发。此时,右表来了⼀条key是4的数据,未能与左表中的key关联上,那么这条数据同样也会被缓存在Join节点的state中。⽽当右表来了⼀条key为1的数据时,与左表中key为1的数据成功关联,那么此时,会将这两条数据Join起来之后的数据下发,⽽其他尚未匹配上的数据会在state中继续等待,直到他们的有缘⼈出现,才能够继续前进。
Outer Join:⽀持LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN三种语法,此处我们以LEFT JOIN为例。还是左表先来三条数据,key分别是1、2、3,不过此时的结果会和上⾯的不⼀样,他们三个虽然还会在Join节点的state中缓存,但是会将数据下发,那么⼤家会问了,右边的数据怎么办,此时并没有Join成功啊,如果下发数据不就存在异常吗?答:Flink会将右边的数据补上NULL,当右表中key为1、2、3的数据出现时,会将刚才下发的三条数据撤回,将右表中的数据重新填充到下发的三条数据中,之后,再将这三条数据下发;⽽如果右表先到了,左表尚未到达的话,会⼀直等待,不会先⾏下发再撤回。RIGHT JOIN与之相似,只是⼀个下发左边,⼀个下发右边;FULL OUTER JOIN是两边都会下发和撤回。
缺点:
因为要存放⼤量的数据在state中,如果左右表的数据⼀直⽆法匹配,那么久⽽久之,内存很容易就被
打爆。解决办法有加机器和使⽤RocksDBStateBackend,同时需要配上合理的状态清理配置,具体的写法可以⾃⾏翻看官⽹⽂档
Join之前最好先根据主键去重,不然会缓存⼤量⽆⽤数据在Join节点的state节点中。举个栗⼦:key为1的数据因为各种原因出现了三条,⽽这三条实际上是同⼀条数据。那么,在Join时,如果右表只有⼀条key为1的数据,那么只会有⼀条数据下发(Inner Join)另外两条⼀直在死等;或者下发⼀条有右边数据的和两条右边数据为NULL的数据(Left Outer Join),同时,这两条数据也会在Join节点的state中缓存,等待右表的数据到达。同样也会打爆我们的内存。去重可以很好的减少Join节点内存的压⼒
假设现在有A、B、C三条流要进⾏JOIN,SQL写法为:A LEFT JOIN B ON A.KEY1 = B.KEY1 LEFT JOIN C ON B.KEY2 = C.KEY2,如果A与B Join的结果产⽣了⼤量B.KEY2为NULL的数据,那么在与C Join时,必然会出现热点问题。那么如何解决呢?我们可以交换Join的顺序,让B、C先⾏Join,产⽣的结果再与A流进⾏Join,这样就能很好的解决热点问题
下⾯我们通过代码和运⾏结果,来看看UnBounded Join的写法和产⽣结果
package FlinkSql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.vironment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.pes.Row;
public class FlinkSql05 {
public static final String KAFKA_TABLE_SOURCE_DDL_01 = ""+
"CREATE TABLE t1 (\n" +
" user_id BIGINT,\n" +
" order_id BIGINT,\n" +
" ts BIGINT\n" +
") WITH (\n" +
" 'pe' = 'kafka', -- 指定连接类型是kafka\n" +
" 'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要⼀致\n" +
" 'pic' = 'unBoundedJoin01_t1', -- 之前创建的topic \n" +
" 'up.id' = 'flink-test-0', -- 消费者组,相关概念可⾃⾏百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定从最早消费\n" +
" 't' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'pe' = 'csv' -- csv格式,和topic中的消息格式保持⼀致\n" +
")";connect和join的区别
public static final String KAFKA_TABLE_SOURCE_DDL_02 = ""+
"CREATE TABLE t2 (\n" +
" order_id BIGINT,\n" +
" item_id BIGINT,\n" +
" ts BIGINT\n" +
") WITH (\n" +
" 'pe' = 'kafka', -- 指定连接类型是kafka\n" +
" 'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要⼀致\n" +
" 'pic' = 'unBoundedJoin01_t2', -- 之前创建的topic \n" +
" 'up.id' = 'flink-test-0', -- 消费者组,相关概念可⾃⾏百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定从最早消费\n" +
" 't' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'pe' = 'csv' -- csv格式,和topic中的消息格式保持⼀致\n" +
")";
public static final String KAFKA_TABLE_SOURCE_DDL_03 = ""+
"CREATE TABLE t3 (\n" +
" user_id BIGINT,\n" +
" order_id BIGINT,\n" +
" ts BIGINT,\n" +
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),
-- 计算列,因为ts是bigint,没法作为⽔印,所以⽤UDF转成TimeS " WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定⽔印⽣成⽅式\n"+
") WITH (\n" +
" 'pe' = 'kafka', -- 指定连接类型是kafka\n" +
" 'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要⼀致\n" +
" 'pic' = 'timeIntervalJoin_01', -- 之前创建的topic \n" +
" 'up.id' = 'flink-test-0', -- 消费者组,相关概念可⾃⾏百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定从最早消费\n" +
" 't' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'pe' = 'csv' -- csv格式,和topic中的消息格式保持⼀致\n" +
")";
public static final String KAFKA_TABLE_SOURCE_DDL_04 = ""+
"CREATE TABLE t4 (\n" +
" order_id BIGINT,\n" +
" item_id BIGINT,\n" +
" ts BIGINT,\n" +
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 计算列,因为ts是bigint,没法作为⽔印,所以⽤UDF转成TimeS " WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定⽔印⽣成⽅式\n"+
") WITH (\n" +
" 'pe' = 'kafka', -- 指定连接类型是kafka\n" +
" 'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要⼀致\n" +
" 'pic' = 'timeIntervalJoin_02', -- 之前创建的topic \n" +
" 'up.id' = 'flink-test-0', -- 消费者组,相关概念可⾃⾏百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定从最早消费\n" +
" 't' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'pe' = 'csv' -- csv格式,和topic中的消息格式保持⼀致\n" +
")";
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论