Flink消费Kafka到HDFS实现及详解
1.概述
最近有同学留⾔咨询,Flink消费Kafka的⼀些问题,今天笔者将⽤⼀个⼩案例来为⼤家介绍如何将Kafka中的数据,通过Flink任务来消费并存储到HDFS上。
2.内容
这⾥举个消费Kafka的数据的场景。⽐如,电商平台、游戏平台产⽣的⽤户数据,⼊库到Kafka中的Topic进⾏存储,然后采⽤Flink去实时消费积累到HDFS上,积累后的数据可以构建数据仓库(如Hive)做数据分析,或是⽤于数据训练(算法模型)。如下图所⽰:
2.1 环境依赖
整个流程,需要依赖的组件有Kafka、Flink、Hadoop。由于Flink提交需要依赖Hadoop的计算资源和存储
资源,所以Hadoop的YARN和HDFS均需要启动。各个组件版本如下:
组件版本
Kafka 2.4.0
Flink 1.10.0
Hadoop2.10.0
2.2 代码实现
Flink消费Kafka集中的数据,需要依赖Flink包,依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>${tor.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>${flink.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.streaming.version}</version>
</dependency>
编写消费Topic的Flink代码,这⾥不对Topic中的数据做逻辑处理,直接消费并存储到HDFS上。代码如下:
/**
* Flink consumer topic data and store into hdfs.
*
* @author smartloli.
*
* Created by Mar 15, 2020
*/
public class Kafka2Hdfs {
private static Logger LOG = Logger(Kafka2Hdfs.class);
public static void main(String[] args) {
if (args.length != 3) {
<("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
return;
}
String bootStrapServer = args[0];
String hdfsPath = args[1];
int parallelism = Integer.parseInt(args[2]);
StreamExecutionEnvironment env = ExecutionEnvironment();
env.setParallelism(parallelism);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_data", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));
// Storage into hdfs
BucketingSink<String> sink = new BucketingSink<>(hdfsPath);
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd"));
sink.setBatchSize(1024 * 1024 * 1024); // this is 1GB
sink.setBatchRolloverInterval(1000 * 60 * 60); // one hour producer a file into hdfs
transction.addSink(sink);
}
private static Object configByKafkaServer(String bootStrapServer) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootStrapServer);
props.setProperty("group.id", "test_bll_group");
props.put("enable.automit", "true");
props.put("automit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
return props;
kafka最新版本}
}
2.3 注意事项
存储到HDFS时,不⽤添加其他HDFS依赖,只需要Flink采⽤yarn-cluster模式提交即可;
采⽤FSDataOutputStream写⼊时,会先写⼊缓冲区,放在内存中;
Flink每次做Checkpoint的时候,会Flush缓冲区的数据,以及将Pending(已经完成的⽂件,但为被Checkpoint记录,可以通过sink.setPendingSuffix("xxx")来设置)结尾的⽂件记录下来
Flink每60秒(可以通过sink.setInactiveBucketCheckInterval(60 * 1000)来进⾏设置)检测,如果⼀个⽂件的FSDataOutputStream在60秒内(可以通过
sink.setInactiveBucketThreshold(60 * 1000)来设置),都还没有接收到数据,Flink就会认为该⽂件是不活跃的Bucket,那么就会被Flush后关闭该⽂件;
我们再深⼊⼀点查看代码,实际上只是在processingTimeService中注册了当前的时间(currentProcessingTime)+ 60秒不写⼊的时间(inactiveBucketCheckInterval)。
接着通过onProcessIngTime⽅法去不停的判断是否满⾜60秒不写⼊,同时也会判断是否到了滚动时间。代码如下:
public void onProcessingTime(long timestamp) throws Exception {
long currentProcessingTime = CurrentProcessingTime();
closePartFilesByTime(currentProcessingTime);
}
在Flink内部封装了⼀个集合Map<String, BucketState<T>> bucketStates = new HashMap<>();⽤来记录当前正在使⽤的⽂件,key是⽂件的路径,BucketState内部封装了该⽂件的所有信息,包括创建时间,最后⼀次写⼊时间(这⾥的写⼊指的是写⼊缓存区的时间,不是Flush的时间)。当前⽂件是打开还是关闭,写缓冲区的⽅法。都在这⾥。
每次Flink要对⽂件进⾏操作的时候,都会从这⾥拿到⽂件的封装对象;
当程序被取消的时候,当前正在操作的⽂件,会被Flush,然后关闭。然后将⽂件的后缀名从in-progress改为pending。这个前后缀都是可以设置,但如果没有什么特殊需求,默认即可。这⾥拿⽂件,⽤的就是上⾯说的bucketStates这个map。它在close⽅法中,会去遍历这个map,去做上述的操作;代码如下:
public void close() throws Exception {
if (state != null) {
for (Map.Entry<String, BucketState<T>> entry : Set()) {
Value());
}
}
}
每次写⼊的时候,都是会bucketStates这个map中获取对应的对象,如果没有,就会new⼀个该对象。然后先判断是否需要滚动(通过当前⽂件⼤⼩和滚动时间去判断),然后才将数据写⼊缓冲区,更新最后写⼊时间,代码如下:
public void invoke(T value) throws Exception {
Path bucketPath = BucketPath(clock, new Path(basePath), value);
long currentProcessingTime = CurrentProcessingTime();
BucketState<T> bucketState = BucketState(bucketPath);
if (bucketState == null) {
bucketState = new BucketState<>(currentProcessingTime);
state.addBucketState(bucketPath, bucketState);
}
if (shouldRoll(bucketState, currentProcessingTime)) {
openNewPartFile(bucketPath, bucketState);
}
bucketState.writer.write(value);
bucketState.lastWrittenToTime = currentProcessingTime;
}
写⼊和关闭HDFS是通过异步的⽅式的,异步的超时时间默认是60秒,可以通过 sink.setAsyncTimeout(60 * 1000)去设置
3.总结
Flink消费Kafka数据并写到HDFS的代码实现是⽐较简短了,没有太多复杂的逻辑。实现的时候,注意Kafka的地址、反序列化需要在属性中配置、以及Flink任务提交的时候,设置yarn-cluster模式、设置好内存和CPU、HDFS存储路径等信息。
4.结束语
这篇博客就和⼤家分享到这⾥,如果⼤家在研究学习的过程当中有什么问题,可以加进⾏讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《》和《》,喜欢的朋友或同学,可以在公告栏那⾥点击购买链接购买博主的书进⾏学习,在此感谢⼤家的⽀持。关注下⾯,根据提⽰,可免费获取书籍的教学视频。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论