flink1.10.1+kafka实现流数据时间窗⼝平均数计算(java版
本)
1. 在idea创建maven项⽬并添加依赖
<properties>
<mavenpiler.source>8</mavenpiler.source>
<mavenpiler.target>8</mavenpiler.target>
<flink.version>1.10.1</flink.version>
<log4j.version>1.2.17</log4j.version>
kafka最新版本<slf4j.version>1.7.7</slf4j.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Flink 的 Java api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- Flink Streaming 的 Java api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- Flink 的 Web UI -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
这⾥的scala版本选择的是2.11,flink-runtime_2.11在idea开发环境运⾏时,需要添加此依赖。
2. 添加主功能代码
package com.demo;
import org.apache.flink.apimon.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.vironment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.tors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class FlinkWindowAvgKafkaStreaming {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = ExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "flink-group");
FlinkKafkaConsumer<String> consumer =
new FlinkKafkaConsumer<>("flink-topic", new SimpleStringSchema(), props);
consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
DataStream<Tuple3<String, Long, Long>> keyedStream = env
.addSource(consumer)
.
flatMap(new MessageSplitter())
.keyBy(0)
.timeWindow(Time.seconds(10))
.apply(new WindowFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple3<String, Long, Long>> out) throws Excep long sum = 0L;
int count = 0;
for (Tuple2<String, Long> record: input) {
sum += record.f1;
count++;
}
Tuple2<String, Long> temp = input.iterator().next();
// 统计数据按三元组形式输出
Tuple3<String, Long, Long> result = new Tuple3<String, Long, Long>(temp.f0, sum / count, End());
}
});
keyedStream.print("output");
}
}
从kafka读取数据,对数据进⾏转换,对转换后的数据先进⾏分组,然后进⾏开窗,在窗⼝范围内计算平均数,并且输出计算的平均数和窗⼝结束时间。
其中窗⼝时间为10秒。
3. kafka的模拟消息
1643685175905,machine-1,5436289024
1643685176920,machine-1,5422505984
1643685177924,machine-1,5431537664
1643685178935,machine-1,5425504256
1643685179940,machine-1,5430718464
1643685180947,machine-1,5437231104
1643685181960,machine-1,5522214912
1643685182965,machine-1,5745750016
1643685183976,machine-1,5746868224
模拟数据可以⼿动通过输⼊kafka消息⽣产者进⾏⽣成,也可以结合 进⾏⽣成。
4. 辅助代码(MessageWaterEmitter)
package com.demo;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {
//@Nullable
@Override
public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
if (lastElement != null && ains(",")) {
String[] parts = lastElement.split(",");
return new Watermark(Long.parseLong(parts[0]));
}
return null;
}
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
if (element != null && ains(",")) {
String[] parts = element.split(",");
return Long.parseLong(parts[0]);
}
return 0L;
}
}
这⾥定义了时间watermark(⽔位线)的获取⽅式。
5. 辅助代码(MessageSplitter)
package com.demo;
import org.apache.flink.apimon.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception { if (value != null && ains(",")) {
String[] parts = value.split(",");
}
}
}
6. 运⾏程序,输出结果
可以看出每隔10秒,就有⼀组窗⼝平均数输出。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论