java编写flink任务⽰例
flink环境搭建很简单,只需要jdk1.8环境即可。
这⾥使⽤的是win10⼦系统ubuntu2004,直接下载,解压,就可以运⾏了。
java编写flink⽰例,既可以在本地运⾏,也可以打包提交到flink任务管理器中执⾏。
每⼀个flink⽰例,都是⼀个job。
本地开发flink⽰例,需要⽤到的依赖和打包插件:
<project xmlns="/POM/4.0.0" xmlns:xsi="/2001/XMLSchema-instance" xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId&</groupId>
<artifactId>flinkdemo</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>flinkdemo</name>
<url></url>
<properties>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
任务管理器提交更改<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>1.11.4</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude&le.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.source.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
可以配置⼀个log4j.properties,便于查看flink运⾏⽇志。
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
第⼀个⽰例,词频统计,这⾥读取⼀个⽂本⽂件,然后将每⼀⾏数据打散,拆分为单个单词,然后根据单词分组,统计出现的次数。
hello flink
hello java
hello world
WordCountApp.java
flinkdemo;
import java.util.Arrays;
import org.apache.flink.apimon.functions.FlatMapFunction;
import org.apache.peinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
public class WordCountJob {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment();
DataSource<String> dataSource = adTextFile("");
FlatMapOperator<String, Tuple2<String, Integer>> flatMap = dataSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> { Arrays.stream(lines.split(" ")).forEach(s -> llect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING,Types.INT));
AggregateOperator<Tuple2<String, Integer>> sum = upBy(0).sum(1);
sum.print();
}
}
直接在本地运⾏,打印结果:
以上程序是⼀个简单的批处理任务,读取⽂件,然后做词频统计,最后输出结果。是有明显的输⼊输出界限的,他的流处理完成,任务结束。
还有⼀类操作,监听⼀个⽹络输⼊事件,会⼀直持续等待⽹络写⼊,不会终⽌,这类流,称为⽆界流。flink很适合做这个⼯作,他的流处理可以做实时数据统计。
下⾯给出⼀个⽰例,连接本地9999⽹络端⼝,等待⽤户输⼊。然后做词频统计,这时候,因为流是⽆边界的,只能⼀步⼀步统计,不
能直接给出最终统计结果。
flinkdemo;
import java.util.Arrays;
import org.apache.flink.apimon.functions.FlatMapFunction;
import org.apache.peinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.vironment.StreamExecutionEnvironment;
public class UnboundStreamJob {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = ExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> { Arrays.stream(lines.split(" ")).forEach(s -> llect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).sum(1);
sum.print("test");
}
}
运⾏这个⽰例之前,我们先启动⼀个9999⽹络监听服务,这⾥使⽤ubuntu 系统⾃带的命令netcat:
huali@admin:~/flink-1.13.3$ netcat -lk 9999
然后,启动这个任务,之后输⼊⼀些单词并回车:
这个任务,我们可以在本地打包,然后提交到flink任务管理器中执⾏,进⼊⼯程所在⽬录,然后在命令⾏下执⾏打包命令:
mvn clean package
打包成功,会⽣成⼀个⼯程名的jar包。在flink webui界⾯,我们切换到Submit New Job菜单:
点击列表右侧的"Add New"按钮,然后添加刚才的jar。jar包上传成功,点击列表中的jar包对象,会弹出下拉选项:
在第⼀个红框中输⼊我们要执⾏的job类名:flinkdemo.UnboundStreamJob,最后点击"Submit"提交按钮。
页⾯会切换到如下所⽰的页⾯,提⽰任务正在running,表⽰正常。这时候,⽹络没有输⼊,这时候等待⽤户输⼊,所以这边的输出显⽰红⾊。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论