java与flink_flink实战—安装与使⽤flink分为三种模式运⾏:local,cluster,基于cloud的运⾏。
本⽂描述了flink在local模式下的安装与使⽤。
环境介绍
系统
Centos-7.0 x64
安装
下载并安装
⾸先下载并安装java
$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
1
2
3
4
下载flink
从以下链接下载flink相关版本,如下:
解压并启动
$ tar xzvf flink-1.7.1-bin-hadoop28-scala_
$ cd flink-1.7.1/
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host firstvm.
Starting taskexecutor daemon on host firstvm.
查看启动⽇志
在flink-1.7.1的log⽬录下查看启动⽇志⽂件:
$ cat flink-*-standalonesession-1-*.log
1
任务执⾏⽇志⽂件
另外要说明的是flink的任务执⾏⽂件在以下⽂件中:
log/flink-*-taskexecutor-1-*.out
查看管理界⾯
按以上命令启动flink时,可以查看flink⾃带的管理界⾯。
关闭防⽕墙
$ sudo systemctl stop firewall
查看管理界⾯
在浏览器中输⼊以下地址:
1
即可看到flink的启动界⾯。
运⾏测试程序
开启本地服务器
开启⼀个新的终端,并输⼊命令:
$ nc -l 9001
1
提交Flink计划
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001
1
让flink任务程序连接到服务器端⼝并等待输⼊。您可以检查Web界⾯以验证作业是否按预期运⾏:输⼊测试⽂本
在nc -l 9001的终端中输⼊⼀下⽂字:
this is a test
test is ok
1
2
查看计算结果
java安装完整教程cd log
$ cat flink-*-taskexecutor-1-*.out
this : 1
ok : 1
test : 2
a : 1
is : 2
可以看到该任务执⾏成功了。
单词计数的源代码
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
/
/ the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = Int("port");
} catch (Exception e) {
return;
}
// get the execution environment
final StreamExecutionEnvironment env = ExecutionEnvironment(); // get input data by connecting to the socket
DataStream text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream windowCounts = text
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) {
for (String word : value.split("\\s")) {
}
}
})
.
keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
总结
本⽂讲述了flink的local模式的安装与使⽤。可以看到,flink的安装和使⽤还是相当⽅便的。---------------------

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。