Flink实操:DataSource操作
.
⼀ .前⾔
本⽂主要写Flink读取数据的⽅式.  只考虑DataStream API.
数据读取的API定义在StreamExecutionEnvironment, 这是Flink流计算的起点. ⼀个DataStream就是从数据读取API中构造出来的.⼆ .四种读取类型
Flink在流处理上⼤致有4⼤类 :
1. 基于本地集合的source(Collection-based-source)
2. 基于⽂件的source(File-based-source)- 读取⽂本⽂件,即符合 TextInputFormat 规范的⽂件,并将其作为字符串返回
3. 基于⽹络套接字的source(Socket-based-source)- 从 socket 读取。元素可以⽤分隔符切分。
resultset 遍历
4. ⾃定义的source(Custom-source)
2.1. 基于本地集合的source(Collection-based-source)
其实就是把集合中的数据变成DataStream.
注: scala版本不⽀持 Iterable , Set, Map 集合.
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
llection.mutable
llection.mutable.{ArrayBuffer, ListBuffer}
object CollectionSource {
def main(args: Array[String]):Unit={
// 1. 创建流处理环境
val env = ExecutionEnvironment
// 设置并⾏度,默认和CPU的核数相同
env.setParallelism(1)
//0.⽤element创建DataStream(fromElements)
val ds0 : DataStream[Int]= env.fromElements(1,3234,55,65,74523,1)
//    ds0.print()
//1.⽤Tuple创建DataStream(fromElements)
val ds1: DataStream[(Int,String)]= env.fromElements((1,"bo"),(2,"yi"))
//    ds1.print()
//2.⽤Array创建DataStream
val ds2: DataStream[String]= env.fromCollection(Array("bo","yi"))
//    ds2.print()
//3.⽤ArrayBuffer创建DataStream
val ds3 :DataStream[String]= env.fromCollection(ArrayBuffer("bo","yi"))
//    ds3.print()
//4.⽤List创建DataStream
//4.⽤List创建DataStream
val ds4 : DataStream[String]= env.fromCollection(List("bo","yi"))
//    ds4.print()
//5.⽤ListBuffer创建DataStream
val ds5 : DataStream[String]=  env.fromCollection(ListBuffer("BO","YI"))
//    ds5.print()
//6.⽤Vector创建DataStream
val ds6 : DataStream[String]= env.fromCollection(Vector("bo","yi",""))
/
/    ds6.print()
//7.⽤Queue创建DataStream
val ds7: DataStream[String]= env.fromCollection(mutable.Queue("bo","yi","flink",""))
//    ds7.print()
//8.⽤Stack创建DataStream
val ds8: DataStream[String]= env.fromCollection(mutable.Stack("bo","yi","flink",""))
//    ds8.print()
//9.⽤Stream创建DataStream
val ds9: DataStream[String]= env.fromCollection(Stream("bo","yi","flink",""))
//    ds9.print()
//10.⽤Seq创建DataStream
val ds10: DataStream[String]= env.fromCollection(Seq("bo","yi","flink",""))
//    ds10.print()
//11.⽤Set创建DataStream(不⽀持)
//val ds11: DataStream[String] = env.fromCollection(Set("bo", "yi","flink",""))
//ds11.print()
//12.⽤Iterable创建DataStream(不⽀持)
//val ds12: DataStream[String] = env.fromCollection(Iterable("bo", "yi","flink",""))
//ds12.print()
//13.⽤ArraySeq创建DataStream
val ds13: DataStream[String]= env.fromCollection(mutable.ArraySeq("bo","yi","flink","")) //    ds13.print()
//    //14.⽤ArrayStack创建DataStream
//    val ds14: DataStream[String] = env.fromCollection(mutable.ArrayStack("bo", "yi","flink","")) //    ds14.print()
//15.⽤Map创建DataStream(不⽀持)
//val ds15: DataStream[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 ->"flink"))
//ds15.print()
//    //16.⽤Range创建DataStream
val ds16: DataStream[Int]= env.fromCollection(Range(1,9))
//    ds16.print()
//    //17. Sequence创建DataStream
val ds17: DataStream[Long]= env.fromSequence(1,9)
ds17.print()
// 执⾏任务,但是在流环境下,必须⼿动执⾏任务
}
}
2.2. 基于⽂件的source(File-based-source)
2.2.1. readTextFile
从本地或者hdfs中加载数据
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object FileSource {
def main(args : Array[String]):Unit={
// 1. 获取流处理运⾏环境
val env:StreamExecutionEnvironment = ExecutionEnvironment // 2. 读取⽂件
val data : DataStream[String]= adTextFile("hdfs://h23:8020/tmp/test/score.csv")
// 3. 打印数据
data.print()
// 4. 执⾏程序
}
}
2.2.2. readCsvFile
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object JoinOp {
// 学科Subject(学科ID、学科名字)
case class Subject(id:Int, name:String)
// 成绩Score(唯⼀ID、学⽣姓名、学科ID、分数)
case class Score(id:Int, name:String, subjectId:Int, score:Double)
def main(args: Array[String]):Unit={
// 1. 创建流处理环境
val env = ExecutionEnvironment
// 2.⽤fromCollection创建DataStream(fromCollection)
val socreData = adCsvFile[Score]("hdfs://h23:8020/tmp/test/score.csv")
val subjectData = adCsvFile[Subject]("hdfs://h23:8020/tmp/test/subject.csv")
// 3.处理数据
val joinData = socreData.join(subjectData).where(2).equalTo(0)
// 4.打印输出
joinData.print()
}
}
2.3. 基于⽹络套接字的source(Socket-based-source)
import org.apache.flink.ateTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object SocketSource {
def main (args : Array[String]):Unit={
//1. 获取流处理运⾏环境
val env : StreamExecutionEnvironment = ateLocalEnvironment() // 2. 构建socket流数据源,并指定IP地址和端⼝号
val data : DataStream[String]= env.socketTextStream("localhost",6666)
// 3. 转换,以空格拆分单词
val res = data.flatMap(_.split(" "))
// 4. 打印输出
res.print()
// 5. 启动执⾏
}
}
2.4. ⾃定义的source(Custom-source)
Flink 中你可以使⽤ StreamExecutionEnvironment.addSource(source) 来为你的程序添加数据来源。
Flink 已经提供了若⼲实现好了的 source functions,当然你也可以通过实现 SourceFunction 来⾃定义⾮并⾏的
source或者实现 ParallelSourceFunction 接⼝或者扩展 RichParallelSourceFunction 来⾃定义并⾏的source。
2.4.1.使⽤MySQL作为数据源
package com.boyi.datasource
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.ateTypeInformation
import org.figuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object MysqlCustomSource {
def main(args : Array[String]):Unit={
// 1. env
val env = ExecutionEnvironment
// 2 使⽤⾃定义Source
val mySqlDataStream: DataStream[(Int,String,String,String)]= env.addSource(new MysqlSource)
// 3. 打印结果
mySqlDataStream.print()
// 4. 执⾏任务
}
//  1. ⾃定义Source,继承⾃RichSourceFunction
class MysqlSource extends RichSourceFunction[(Int,String,String,String)]{
var connection:  Connection =null;
var ps:  PreparedStatement =null;
override def open(parameters: Configuration):Unit={
super.open(parameters)
//    1. 加载驱动
Class.forName("sql.jdbc.Driver")
//    2. 创建连接
connection = Connection("jdbc:mysql://127.0.0.1:3306/tmp?useUnicode=true&characterEncoding=UTF-8&useSSL=false","root","ro ot")
//    3. 创建PreparedStatement
val sql ="select id,username,password,name from user"
ps = connection.prepareStatement(sql)
}
//  2. 实现run⽅法
override def run(sourceContext: SourceFunction.SourceContext[(Int,String,String,String)]):Unit={
//    4. 执⾏查询
val resultSet: ResultSet = ps.executeQuery()
//    5. 遍历查询结果,收集数据
()){
val id = Int("id")
val username = String("username")
val password = String("password")
val name = String("name")

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。