提交FlinkJob任务:实现WordCount 上传Flink Job任务:实现WordCount
⼀实现Job任务的Jar包
1.1 修改l⽂件:添加依赖
<!-根据⾃⼰使⽤的scala与flink的版本修改版本号->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.11.2</version>
</dependency>
1.2 创建scala Object实例FlinkStream
package com.sm.Flink_Test
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object FlinkStream {
def main(args: Array[String]): Unit ={
//创建流处理环境
val env: StreamExecutionEnvironment = ExecutionEnvironment
//从外部命令读取参数,作为主机名和端⼝号
val paramTool: ParameterTool = ParameterTool.fromArgs(args)
val host:("host")
val port:Int("port")
// val host:String="192.168.1.106"
// val port:Int=7777
//配置并⾏度,这⾥暂时没有配置
//env.setParallelism(2)
//监听端⼝
val inputDataStream: DataStream[String]= env.socketTextStream(host,port)
/
/对获取的数据进⾏WordCount
val resultDataSet: DataStream[(String, Int)]= inputDataStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_,1))
.keyBy(0)
.sum(1)
//输出结果
resultDataSet.print()
//启动流处理任务,并命名任务名称
}
}
打包项⽬
⼆上传Job任务运⾏
2.1 在Linux上启动Flink服务启动命令(指定Flink⽬录下执⾏):
/opt/apps/flink-1.12.0/bin/start-cluster.sh
启动后的进程:
2.2 上传jar提交任务
添加参数后提交
就可以看到任务运⾏着
2.3 发送数据并查看效果
发⽣数据
nc -lk 7777
效果:scala不是内部或外部命令
到这⾥Flink上传Jar包新建Job任务就完成了
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论