Flink开发环境部署和配置
⼀、Flink 开发环境部署和配置
Flink 是⼀个以 Java 及 Scala 作为开发语⾔的开源⼤数据项⽬,代码开源在 GitHub 上,并使⽤ Maven 来编译和构建项⽬。对于⼤部分使⽤ Flink 的同学来说,Java、Maven 和 Git 这三个⼯具是必不可少的,另外⼀个强⼤的 IDE 有助于我们更快的阅读代码、开发新功能以及修复 Bug。因为篇幅所限,我们不会详述每个⼯具的安装细节,但会给出必要的安装建议。
关于开发测试环境,Mac OS、Linux 系统或者 Windows 都可以。如果使⽤的是 Windows 10 系统,建议使⽤ Windows 10 系统的Linux ⼦系统来编译和运⾏。
1. 编译 Flink 代码
在我们配置好之前的⼏个⼯具后,编译 Flink 就⾮常简单了,执⾏如下命令即可:
mvn clean install -DskipTests
或者
mvn clean package -DskipTests
常⽤编译参数:
-Dfast 主要是忽略QA plugins和JavaDocs的编译
-Dhadoop.version=2.6.1 指定hadoop版本
–settings=${maven_file_path} 显式指定l配置⽂件
当成功编译完成后,能在当前 Flink 代码⽬录下的 flink-dist/target/⼦⽬录 中看到如下⽂件(不同的 Flink 代码分⽀编译出的版本号不同,这⾥的版本号是 Flink 1.5.1):
img
其中有三个⽂件可以留意⼀下:
版本 注释
flink-1.5. Binary 的压缩包
flink-1.5.1-bin/flink-1.5.1 解压后的 Flink binary ⽬录
flink-dist_2.11-1.5.1.jar 包含 Flink 核⼼功能的 jar 包
注意:
国内⽤户在编译时可能遇到编译失败“Build Failure”(且有 MapR 相关报错),⼀般都和 MapR 相关依赖的下载失败有关,即使使⽤了推荐的 l 配置(其中 Aliyun Maven 源专门为 MapR 相关依赖做了代理),还是可能出现下载失败的情况。问题主要和MapR 的 Jar 包⽐较⼤有关。遇到这些问题时,重试即可。在重试之前,要先根据失败信息删除 Maven local repository 中对应的⽬录,否则需要等待 Maven 下载的超时时间才能再次出发下载依赖到本地。
2. 开发环境准备
推荐使⽤ IntelliJ IDEA IDE 作为 Flink 的 IDE ⼯具。官⽅不建议使⽤ Eclipse IDE,主要原因是 Eclipse 的 Scala IDE 和 Flink ⽤Scala 的不兼容。
如果你需要做⼀些 Flink 代码的开发⼯作,则需要根据 Flink 代码的 tools/maven/⽬录 下的配置⽂件来配置 Checkstyle ,因为 Flink 在编译时会强制代码风格的检查,如果代码风格不符合规范,可能会直接编译失败。
⼆、运⾏ Flink 应⽤
1. 基本概念
运⾏ Flink 应⽤其实⾮常简单,但是在运⾏ Flink 应⽤之前,还是有必要了解 Flink 运⾏时的各个组件,因为这涉及到 Flink 应⽤的配置问题。图 1 所⽰,这是⽤户⽤ DataStream API 写的⼀个数据处理程序。可以看到,在⼀个 DAG 图中不能被 Chain 在⼀起的Operator 会被分隔到不同的 Task 中,也就是说 Task 是 Flink 中资源调度的最⼩单位。
img图 1. Parallel Dataflows
如下图图 2 所⽰,Flink 实际运⾏时包括两类进程:
JobManager(⼜称为 JobMaster):协调 Task 的分布式执⾏,包括调度 Task、协调创 Checkpoint 以及当 Job failover 时协调各个Task 从 Checkpoint 恢复等。
TaskManager(⼜称为 Worker):执⾏ Dataflow 中的 Tasks,包括内存 Buffer 的分配、Data Stream 的传递等。
img图 2 . Flink Runtime 架构图
图 3 所⽰,Task Slot 是⼀个 TaskManager 中的最⼩资源分配单位,⼀个 TaskManager 中有多少个 Task Slot 就意味着能⽀持多少并发的 Task 处理。需要注意的是,⼀个 Task Slot 中可以执⾏多个 Operator,⼀般这些 Operator 是能被 Chain 在⼀起处理的。
img图 3 . Process
2. 运⾏环境准备
准备 Flink binary
○ 直接从 Flink 官⽹上下载 Flink binary 的压缩包
○ 或者从 Flink 源码编译⽽来
安装 Java,并配置 JAVA_HOME 环境变量
3. 单机 Standalone 的⽅式运⾏ Flink
(1)基本的启动流程
最简单的运⾏ Flink 应⽤的⽅法就是以单机 Standalone 的⽅式运⾏。
启动集:
./bin/flink run examples/streaming/WordCount.jar
⼤家可以⾃⾏探索 Web 界⾯中展⽰的信息,⽐如,我们可以看看 TaskManager 的 stdout ⽇志,就可以看到 Word Count 的计算结果。
我们还可以尝试通过“–input”参数指定我们⾃⼰的本地⽂件作为输⼊,然后执⾏:
./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}
停⽌集:
./bin/stop-cluster.sh
(2)常⽤配置介绍
conf / slaves
conf / slaves ⽤于配置 TaskManager 的部署,默认配置下只会启动⼀个 TaskManager 进程,如果想增加⼀个 TaskManager 进程的,只需要⽂件中追加⼀⾏“localhost”。
也可以直接通过“ ./bin/taskmanager.sh start ”这个命令来追加⼀个新的 TaskManager:
./bin/taskmanager.sh start|start-foreground|stop|stop-all
conf/flink-conf.yaml
conf/flink-conf.yaml ⽤于配置 JM 和 TM 的运⾏参数,常⽤配置有:
The heap size for the JobManager JVM
jobmanager.heap.mb: 1024
The heap size for the TaskManager JVM
taskmanager.heap.mb: 1024
The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4
the managed memory size for each task manager.
Standalone 集启动后,我们可以尝试分析⼀下 Flink 相关进程的运⾏情况。执⾏ jps 命令,可以看到 Flink 相关的进程主要有两个,⼀个是 JobManager 进程,另⼀个是 TaskManager 进程。我们可以进⼀步⽤ ps 命令看看进程的启动参数中“-Xmx”和“-Xms”的配置。然后我们可以尝试修改 flink-conf.ya
ml 中若⼲配置,然后重启 Standalone 集看看发⽣了什么变化。
需要补充的是,在 Blink 开源分⽀上,TaskManager 的内存计算上相对于现在的社区版本要更精细化,TaskManager 进程的堆内存限制(-Xmx)⼀般的计算⽅法是:
TotalHeapMemory = taskmanager.heap.mb + size +
taskmanager.mb(默认值为128MB)
⽽最新的 Flink 社区版本 Release-1.7 中 JobManager 和 TaskManager 默认内存配置⽅式为:
The heap size for the JobManager JVM
jobmanager.heap.size: 1024m
The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m
Flink 社区 Release-1.7 版本中的“taskmanager.heap.size”配置实际上指的不是 Java heap 的内存限制,⽽是 TaskManager 进程总的内存限制。我们可以同样⽤上述⽅法查看 Release-1.7 版本的 Flink
binary 启动的 TaskManager 进程的 -Xmx 配置,会发现实际进程上的 -Xmx 要⼩于配置的“taskmanager.heap.size”的值,原因在于从中扣除了 Network buffer ⽤的内存,因为 Network buffer ⽤的内存⼀定是 Direct memory,所以不应该算在堆内存限制中。
(3)⽇志的查看和配置
JobManager 和 TaskManager 的启动⽇志可以在 Flink binary ⽬录下的 Log ⼦⽬录中到。Log ⽬录中以“flink-{id}-
${hostname}”为前缀的⽂件对应的是 JobManager 的输出,其中有三个⽂件:
flink-{id}-{user}-standalonesession-{hostname}.out:进程执⾏时的stdout输出
flink-{id}-{hostname}”为前缀的⽂件对应的是 TaskManager 的输出,也包括三个⽂件,和 JobManager 的输出⼀致。
⽇志的配置⽂件在 Flink binary ⽬录的 conf ⼦⽬录下,其中:
log4j-cli.properties:⽤ Flink 命令⾏时⽤的 log 配置,⽐如执⾏“ flink run”命令
log4j-yarn-session.properties:⽤ yarn-session.sh 启动时命令⾏执⾏时⽤的 log 配置
log4j.properties:⽆论是 Standalone 还是 Yarn 模式,JobManager 和 TaskManager 上⽤的 log 配置都是 log4j.properties 这三个“log4j.properties”⽂件分别有三个“l”⽂件与之对应,如果想使⽤ Logback 的同学,之需要把与之对应的“log4j.*properties”⽂件删掉即可,对应关系如下:
log4j-cli.properties -> l
log4j-yarn-session.properties -> l
log4j.properties -> l
需要注意的是,“flink-{id}-和{user}-taskexecutor-{hostname}”都带有“,{id}”表⽰本进程在本机上该⾓⾊(JobManager 或TaskManager)的所有进程中的启动顺序,默认从 0 开始。
(4)进⼀步探索
尝试重复执⾏“./bin/start-cluster.sh”命令,然后看看 Web 页⾯(或者执⾏jps命令),看看会发⽣什么?可以尝试看看启动脚本,分析⼀下原因。接着可以重复执⾏“./bin/stop-cluster.sh”,每次执⾏完后,看看会发⽣什么。
4. 多机部署 Flink Standalone 集
部署前要注意的要点:
每台机器上配置好 Java 以及 JAVA_HOME 环境变量
每台机器上部署的 Flink binary 的⽬录要保证是同⼀个⽬录
如果需要⽤ HDFS,需要配置 HADOOP_CONF_DIR 环境变量配置
根据你的集信息修改 conf/masters 和 conf/slaves 配置。
修改 conf/flink-conf.yaml 配置,注意要确保和 Masters ⽂件中的地址⼀致:
user −standalonesession −hostname .log :代码中的⽇志输出flink −id −
user −standalonesession −hostname −gc .log :JV M 的GC 的⽇志Log ⽬录中以“flink −id −
jobmanager.rpc.address: h.tbsite
确保所有机器的 Flink binary ⽬录中 conf 中的配置⽂件相同,特别是以下三个:
conf/masters
conf/slaves
conf/flink-conf.yaml
然后启动 Flink 集:
./bin/start-cluster.sh
提交 WordCount 作业:
./bin/flink run examples/streaming/WordCount.jar
上传 WordCount 的 Input ⽂件:
hdfs dfs -copyFromLocal story /test_dir/input_dir/story
提交读写 HDFS 的 WordCount 作业:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output
hdfs:///test_dir/output_dir/output
增加 WordCount 作业的并发度(注意输出⽂件重名会提交失败):
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output
hdfs:///test_dir/output_dir/output --parallelism 20
5. Standalone 模式的 HighAvailability**(HA)部署和配置**
通过图 2 Flink Runtime 架构图,我们可以看到 JobManager 是整个系统中最可能导致系统不可⽤的⾓⾊。如果⼀个 TaskManager 挂了,在资源⾜够的情况下,只需要把相关 Task 调度到其他空闲 TaskSlot 上,然后 Job 从 Checkpoint 中恢复即可。⽽如果当前集中只配置了⼀个 JobManager,则⼀旦 JobManager 挂了,就必须等待这个 JobManager 重新恢复,如果恢复时间过长,就可能导致整个Job 失败。
因此如果在⽣产业务使⽤ Standalone 模式,则需要部署配置 HighAvailability,这样同时可以有多个 JobManager 待命,从⽽使得JobManager 能够持续服务。
img图 4. Flink JobManager HA ⽰意图
注意:
如果想使⽤ Flink standalone HA 模式,需要确保基于 Flink Release-1.6.1 及以上版本,因为这⾥社区有个 bug 会导致这个模式下主JobManager 不能正常⼯作。
接下来的实验中需要⽤到 HDFS,所以需要下载带有 Hadoop ⽀持的 Flink Binary 包。
(1)(可选)使⽤ Flink ⾃带的脚本部署 ZK
Flink ⽬前⽀持基于 Zookeeper 的 HA。如果你的集中没有部署 ZK,Flink 提供了启动 Zookeeper 集的脚本。⾸先修改配置⽂
件“conf/zoo.cfg”,根据你要部署的 Zookeeper Server 的机器数来配置“server.X=addressX:peerPort:leaderPort”,其
中“X”是⼀个 Zookeeper Server的唯⼀ ID,且必须是数字。
The port at which the clients will connect
scala不是内部或外部命令clientPort=3181
server.1=h.tbsite:4888:5888
server.2=h.tbsite:4888:5888
server.3=h.tbsite:4888:5888
然后启动 Zookeeper:
./bin/start-zookeeper-quorum.sh
jps 命令看到 ZK 进程已经启动:
img
停掉 Zookeeper 集的命令:
./bin/stop-zookeeper-quorum.sh
(2)修改 Flink Standalone 集的配置
修改 conf/masters ⽂件,增加⼀个 JobManager:
$cat conf/masters
h.tbsite:8081
h.tbsite:8081
之前修改过的 conf/slaves ⽂件保持不变:
$cat conf/slaves
h.tbsite
h.tbsite
h.tbsite
修改 conf/flink-conf.yaml ⽂件:
配置high-availability mode
high-availability: zookeeper
配置zookeeper quorum(hostname和端⼝需要依据对应zk的实际配置)
(可选)设置zookeeper的root⽬录
<: /test_dir/test_standalone2_root
(可选)相当于是这个standalone集中创建的zk node的namespace
high-availability.cluster-id: /test_dir/test_standalone2
JobManager的meta信息放在dfs,在zk上主要会保存⼀个指向dfs路径的指针
high-availability.storageDir: hdfs:///test_dir/recovery2/
需要注意的是,在 HA 模式下 conf/flink-conf.yaml 中的这两个配置都失效了(想想为什么)。
jobmanager.rpc.address
jobmanager.rpc.port
修改完成后,确保配置同步到其他机器。
启动 Zookeeper 集:
./bin/start-zookeeper-quorum.sh
再启动 Standalone 集(要确保之前的 Standalone 集已经停掉):
./bin/start-cluster.sh
分别打开两个 Master 节点上的 JobManager Web 页⾯:
可以看到两个页⾯最后都转到了同⼀个地址上,这个地址就是当前主 JobManager 所在机器,另⼀个就是 Standby JobManager。以上我们就完成了 Standalone 模式下 HA 的配置。
接下来我们可以测试验证 HA 的有效性。当我们知道主 JobManager 的机器后,我们可以把主 JobManager 进程 Kill 掉,⽐如当前主JobManager 在 h.tbsite 这个机器上,就把这个进程杀掉。
可以发现后⼀个链接已经不能展⽰了,⽽前⼀个链接可以展⽰,说明发⽣主备切换。
然后我们再重启前⼀次的主 JobManager:
6. 使⽤ Yarn 模式跑 Flink job
img图 5. Flink Yarn 部署流程图
相对于 Standalone 模式,Yarn 模式允许 Flink job 的好处有:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。