connectflink_Flinkconnect算⼦实践
借⽤官⽹的⼀个例⼦:
假设存在⼀个序列,序列中的元素是具有不同颜⾊与形状的图形,我们希望在序列⾥相同颜⾊的图形中寻满⾜⼀定顺序模式的图形对(⽐如在红⾊的图形⾥,有⼀个长⽅形跟着⼀个三⾓形)。 同时,我们希望寻的模式也会随着时间⽽改变。
在这个例⼦中,我们定义两个流,⼀个流包含图形(Item),具有颜⾊和形状两个属性。另⼀个流包含特定的规则(Rule),代表希望寻的模式。
Flink 开发的时候,经常会遇到这种情况,数据的输⼊源有多个,需要将⼀些流先关联起来(⽐如:清洗规则、动态配置),再做后续的计算。
对于这样的场景,可能很容易就想到使⽤ join api ,直接将两个流 join 起来。
实际上,这样个需求,使⽤ join api 是不太适合的, join 是基于窗⼝的,要在窗⼝内有关联的数据,才能进⾏后续的计算。 这个需要中,规则流的某些规则在整个程序的执⾏周期⾥,可能只会有⼀次。
调⽤DateStream 的 broadcast ⽅法,将⼀个流解释成⼴播流,再调⽤⾮⼴播流(keyed 或者 non-keyed)
的 connect() 关联, 将BroadcastStream 当做参数传⼊。 这个⽅法的返回参数是 BroadcastConnectedStream,具有类型⽅法 process(),传⼊⼀个特殊的CoProcessFunction 来书写我们的关联逻辑。
⼀般来说使⽤⼴播流的时候,在每个并发中都会保留⼴播的全部数据(可能没办法区分那些是需要的,那些不需要),这样就会导致⼴播状态越来越⼤,如果⼴播状态更新⽐较频率的,就不太适⽤了。
注: ⼴播状态使⽤的是 Operator State,运⾏时保存在内存中。
所以就进⼊了今天的重点是 connect 在⾮⼴播流中的使⽤。
DataStream,DataStream → ConnectedStreams"Connects" two data streams retaining their types. Connect allowing
forshared state between the two streams.
DataStream someStream = //...
DataStream otherStream = //...
ConnectedStreams connectedStreams = t(otherStream);
看过上⾯⼴播状态内容的同学应该知道,connect 的顺序是很重要的,那⾮⼴播流的 connect 呢?
进⼊主题前,先思考这⼏个问题:
1、两个 non keyed 流 connect 的时候,数据是怎么分配的(不同并发的数据怎么分,随机分配、循环?)
2、keyed 流 connect non keyed 流 的时候,数据是怎么分配的
3、non keyed 流 connect keyed 流 的时候,数据是怎么分配的
4、两个 keyed 流 connect 的时候,数据是怎么分配的
对应问题4,很容易就想到: 两个流的 keyBy 都是对下游 CoProcessFunction 的并发做的分区,所以相同 key 的数据⼀定会发到⼀起。
为了解决其他问题,有了如下程序:
⼤致业务: 数据流关联配置流,获取编码对应转码值,关联不上的就⽤默认值。
为了⽅便数据源就实现个 SourceFunction ⽣成 0 到 300 的数值,拼接 5 位随机的字符串
class RadomSourceFunction extendsSourceFunction[String] {
var flag= trueoverride def cancel(): Unit={
flag= false}
override def run(ctx: SourceFunction.SourceContext[String]): Unit={while(flag) {for (i
var Stringwhile (nu.length < 3) {
nu= "0" +nu
}//code + other
Thread.sleep(2000)
}
}
}
}
主程序如下:
connect和join的区别val config = new FlinkKafkaConsumer[String]("dynamic_config", newSimpleStringSchema, Prop)
val configStream=env
.addSource(config)
.name("configStream")
val input= env.addSource(newRadomSourceFunction)
.name("radomFunction")//⾮并发的souce function 不能添加并发
.map(str =>str)
.setParallelism(4)
val t(configStream)
.process(newCoProcessFunction[String, String, String] {
var mapState: MapState[String, String]=_//var map: util.HashMap[String, String] = null
override def open(parameters: Configuration): Unit={//thinking broken, if use keyed state, must keyby upstream//Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
mapState = MapState(new MapStateDescriptor[String, String]("mapState", classOf[String], classOf[String]))//map = new util.HashMap[String, String]()
}
override def processElement1(element: String, context: CoProcessFunction[String, String, String]#Context, out: Collector[String]): Unit={//checkouk map keys
val it =mapState.keys().iterator()
var size= 0
while(it.hasNext) {
val ()
size+= 1}//val size = map.size()
println("keys size : " +size)
val citeInfo= element.split(",")
val code= citeInfo(0)
var (code)//var va = (code)//不能转码的数据默认输出 中国(code=xxx)
if (va == null) {
va= "中国(code=" + code + ")";
}else{
va= va + "(code=" + code + ")"}
}
override def processElement2(element: String, context: CoProcessFunction[String, String, String]#Context, collector:
Collector[String]): Unit={
IndexOfThisSubtask+ ", " +element)
val param= element.split(",")//update mapState
mapState.put(param(0), param(1))//map.put(param(0), param(1))
}
override def close(): Unit={
mapState.clear()
}
}).setParallelism(4)
val sink= new FlinkKafkaProducer[String]("non_key_connect_demo", newSimpleStringSchema(), Prop)
stream.print()
然后就。。。。
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()'operation.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeC
at org.apache.flink.streaming.api.MapState(StreamingRuntimeContext.java:216)
at com.venn.t.KeyedConnectDemo$$anon$1.open(KeyedConnectDemo.scala:67)
at org.apache.flink.apimon.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.CoProcessOperator.open(CoProcessOperator.java:59)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
直接赏我个 NullPointerException,突然想起来,如果不做 keyBy,不能⽤ keyed state
这点⼩⼩的⿇烦,当然难不倒我
直接上 使⽤ hashmap 存更新流的数,⼀样看结果(注: 数据量不⼤可以使⽤ Operator State)
执⾏图如下:
配置流是1个并发,下游 Co-Process 是 4 个并发,特意在处理更新流的procesElement2 中打印出接收到数据对应的 subtask id
IndexOfThisSubtask + ", " + element)
从 kafka 输⼊数据,查看打印的数据,随意截取部分数据贴在这⾥:
subtask id : 2, 031,营⼝市
subtask id :3, 016,宁波市
subtask id :1, 030,锦州市
subtask id :0, 041,黄州市
subtask id :1, 034,朝阳市
subtask id :3, 020,丽⽔市
subtask id :1, 038,襄城市
数据在各个并发之间循环发送,但是啊,没有做 key 的流,只有⼀个并发拿到了配置数据,其他的怎么办呢?
只有在不⽤ broadcast 和 keyed 的时候,需要考虑这个问题,代码⾥⾯已经给了解决办法,感兴趣的同学可以去看下
总结下 connect 算⼦的内容,⼀般场景下,使⽤ broadcast connnect 和 keyed connect,能保证每个并发(key)能拿到需要的数据,⼴播是通过⼴播数据到下游的所有并发来保证的,keyed 是通过 对相同键做 key 来保证的。
欢迎关注Flink菜鸟,会不定期更新Flink(开发技术)相关的推⽂
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论