Flink官⽅⽂档学习—《基本API概念》
⽬录
基本API概念
Flink 是实现了分布式集合转换操作(例如 filtering,mapping,updating state,joining,grouping,defining windows,aggregating 等)的⼀般程序。集合最开始从sources(例如,从 files,kafka topics,或本地,内存中集合读取)创建。结果通过sinks返回,⽐如将数据写出到(分布式)⽂件,或标准输出(例如,终端的命令⾏)。Flink 程序可以在多种环境下运⾏:单节点,或嵌⼊到其他程序中。计算可以在本地JVM中进⾏,也可以在多台机器组成的集上进⾏。
取决于数据源类型,例如有界或者⽆界数据源,您可以编写⼀个批处理或流处理程序。其中,批处理程序⽤DataSet API,流处理程序⽤DataStream API。本篇指南将会介绍这两种API的通⽤概念,但是您可以查看Streaming Guide和Batch Guide来查看这两种API的详细信息。
注意:当展⽰API⽤法的实例时,我们将使⽤StreamingExecutionEnvironment和 DataStream API。⼆者的概念是⼀样的,使⽤DataSet时只需要 替换 ExecutionEnvironment和 DataSet。
1. Dataset和DataStream
在程序中,Flink使⽤特殊的类DataSet和DataStream来表⽰数据。你可以把他们当做是可以包含副本的不可变数据集。DataSet数据是有限的,⽽DataStream中元素数量可以是⽆限个的。
在⼀些⽅⾯,这些数据集和常规的Java集合是不同的。⾸先,他们是不可变的,这意味着集合⼀旦创建,其中的元素就不可以在添加或者移除。甚⾄简单的检查⾥⾯的元素都不可以。
数据集最开始通过向Flink程序添加source来创建,⽽新的数据集可以通过转换(例如map,filter等等)这些数据集衍⽣⽽来。2. Flink程序构成(Anatomy of a Flink Program)
Flink程序看起来和那些转换数据集的常规程序⼀样。每个程序都由下⾯⼏个相同的基本部分构成:
1)获取执⾏环境;
2)加载/创建初始数据;
3)指定转换操作;
4)指定计算结果输出⽅式;
5)开始执⾏。
现在我们⼤致介绍下这⼏个步骤。详细信息还请查看相应章节。请注意:Scala Data API的核⼼类在 包下,Scala DataStream API的核⼼类在包下。
StreamingExecutionEnvironment是所有Flink程序的基础,可以通过StreamingExecutionEnvironment的静态⽅法获得:getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
⼀般来说,你只需要使⽤getExecutionEnvironment()⽅法,因为这个⽅法将根据上下⽂返回正确的执⾏环境:⽐如说你使⽤IDE 或者作为通⽤Java程序来运⾏你的程序,它将创建⼀个本地环境,在本地机器上执⾏你的程序。如果你把程序打成了jar包,并⽤过命令⾏启动,Flink集管理器将会执⾏你的main⽅法,并且getExecutionEnvironment()⽅法会返回⼀个在集上运⾏你的程序的执⾏环境。
⾄于指定数据源,执⾏环境有多种⽅法来读取⽂件:你可以⼀⾏⾏的读,读取CSV⽂件,或者使⽤完全⾃定义的输⼊数据格式。仅仅读取⼀个text⽂件为⾏队列,你可以使⽤下⾯的⽅法:
val env = ExecutionEnvironment()
val text: DataStream[String] = adTextFile("file:///path/to/file")
这将返回⼀个DataStream,在这之上你可以应⽤转换操作来创建新的衍⽣的DataStreams。
你可以在Dataset是调⽤转换函数来转换DataSet(官⽹这⾥有问题)。⽐如map操作:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
通过将原始集合中的每⼀个字符串转换成整型,这将创建⼀个新的DataStream(很明显官⽹这⾥是有问题的,但是操作是⼀样的)。
得到包含最终结果的DataStream后,你可以创建⼀个sink来将结果写出到外部系统:
writeAsText(path: String)
print()
完成整个程序的逻辑编码后,你需要调⽤StreamingExecutionEnvironment的execute()⽅法来触发程序执⾏。根据执⾏环境ExecutionEnvironment的不同,程序将在本地或者集上运⾏。
execute()⽅法有⼀个返回值:JobExecutionResult,这个返回值包含了程序执⾏时间和累加器结果。
(JobExecutionResult继承⾃JobSubmissionResult,JobSubmissionResult有JobID属性,所以通过JobExecutionResult也可获得JobID属性。)
⾄于DataSet和DataStream的source和sink的详细信息,请参考相应的指导⽂档。
3. 延迟计算(Lazy Evaluation)
所有的Flink程序都是延迟执⾏的:当执⾏程序的主⽅法时,数据的加载和转化操作并没有直接发⽣,
⽽是创建这些操作,并添加到程序计划。这些操作实际是在ExecutionEnvironment的execute()⽅法触发后执⾏的。程序是在本地执⾏还是在集上运⾏,取决于ExecutionEnvironment的类型。
延迟计算使得你可以构建复杂的程序,⽽Flink只需把它当做⼀个整体的计划单元运⾏。
4. 指定键(Specifying Keys)
⼀些转换操作(join,coGroup,keyBy,groupBy)要求集合内的元素需要定义有键。另⼀些操作
(Reduce,GroupReduce,Aggregate,Windows)需要在使⽤这些操作前将数据按key分组。
DataSet这样分组(官⽹不给⼒,没给出Scala版本):
val input: DataSet[String] = adTextFile("src/main/")
val reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
DataStream这样指定key:
val input: DataStream[String] = adTextFile("src/main/")
val windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/)
Flink的数据模型不是基于键值对的,因此,你不需要⼿动的把数据集打包成键值对。键是虚拟的:他们被定义为实际数据上的函数,来引导分组操作。
注意:下⾯的讨论中,我们将使⽤DataStream API和KeyBy展⽰。对于DataSet API,只需要⽤DataSet和groupBy替换即可。
4.1 为Tuple定义键(Define keys for Tuples)
最简单的⽤例是根据Tuple的⼀个或多个字段对Tuple分组:
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)
Tuple以它的第⼀个字段分组(也就是⽰例中Int类型的那个字段)
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = upBy(0,1)
这⾥,我们使⽤了⼀个组合键来对Tuple分组。这个组合件由第⼀和第⼆个字段组成。
嵌套Tuple需要注意的⼀个点:如果你的DataStream内有嵌套的tuple,⽐如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
使⽤KeyBy(0)指定键,系统将会使⽤整个Tuple2作为键(整型和浮点型的)。如果想使⽤Tuple2内部字段作为键,你可以使⽤字段来表⽰键,这种⽅法会在后⾯阐述。
4.2 使⽤字段表达式定义键(Define keys using Field Expressions)
你可以使⽤基于字符串的字段表达式来引⽤嵌套字段,⽤这些字段来为grouping,sorting,joining或者coGroupping定义键。
字段表达式使选择嵌套(组合)类型数据(例如Tuple,POJO)中的字段变得⾮常容易。
在下⾯的例⼦中,我们有⼀个包含两个字段:word和count的 wc POJO。为了根据word字段分组,我们只需把字段名传给KeyBy()函数:
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
字段表达式语法:
通过字段名选择POJO的字段。例如 user 表⽰ ⼀个POJO的user字段;
Tuple通过offset来选择,"_1"和"5"分别代表第⼀和第六个Scala Tuple字段
POJO和Tuple的嵌套字段也可以拿到。例如 "user.zip"可以表⽰POJO的user属性的zip属性。任意的嵌套和混合都是⽀持的,例如"_2.user.zip"或"user._4.1.zip"
也可以选择全类型,使⽤通配符表达式"_"。这对不是POJO或者Tuple的类型也适⽤。
字段表达式⽰例:
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
上述⽰例代码的有效字段表达式如下:
"count": wc 类的count字段;
"complex": 递归的选取ComplexNestedClass的所有字段;
"complex.word._3":ComplexNestedClass类中的tuple word的第三个字段;
"complex.hadoopCitizen":选择Hadoop IntWritable类型。
4.3 使⽤键选择器函数定义键(Define keys using Key Selector Functions)
还有⼀种定义键的⽅式叫做“键选择器”函数。键选择器函数需要⼀个元素作为⼊参,返回这个元素的键。这个键可以是任何类型的,也可从指定计算中⽣成。
下⾯的⽰例展⽰了⼀个键选择函数,这个函数仅仅返回了⼀个对象的字段。
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
5. 指定转换函数(Specifying Transformation Functions)
⼤多数的转换操作需要⽤户⾃⼰定义函数。这⼀章节列举了指定这些函数的⼏种不同⽅式。
5.1 Lambda函数(Lambda Functions)
之前见过的,所有的操作都能接受Lambda函数来描述操作:
val data: DataSet[String] = // [...]
data.filter { _.startsWith("") }
val data: DataSet[Int] = // [...]
// or
5.2 Rich Functions
scala不是内部或外部命令所有能把Lambda函数当做参数接收的转换操作都可以接收Rich函数来替换Lambda函数。例如:
data.map { x => x.toInt }
可以写成:
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
};
data.map(new MyMapFunction())
Rich函数也可以定义成匿名的:
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
Rich函数除了提供⽤户⾃定义函数(map,reduce等),还提供了四种⽅法:open,close,getRuntimeContext和setRuntimecontext。这些功能在参数化函数、创建和确定本地状态、获取⼴播变量、获取运⾏时信息(例如累加器和计数器)和迭代信息时⾮常有帮助。
6. ⽀持的数据类型(Supported Data Types)
Flink对DataSet和DataStream中可使⽤的元素类型添加了⼀些约束。原因是系统可以通过分析这些类型来确定有效的执⾏策略。有7中不同的数据类型:
Java Tuple 和 Scala Case类;
Java POJO;
基本类型;
通⽤类;
值;
Hadoop Writables;
特殊类型。
6.1 Tuples 和 Case 类

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。