Flink批量处理之DataSet
Flink批量处理之DataSet
flink不仅可以⽀持实时流式处理,它也可以⽀持批量处理,其中批量处理也可以看作是实时处理的⼀个特殊情况
1、dataSet的内置数据源
基于⽂件数据源
readTextFile(path) / TextInputFormat:逐⾏读取⽂件并将其作为字符串(String)返回
readTextFileWithValue(path) / TextValueInputFormat:逐⾏读取⽂件并将其作为StringValue返回。StringValue是Flink对String的封装,可变、可序列化,⼀定程度上提⾼性能。 解析以逗号(或其他字符)分隔字段的⽂件。返回元组或pojo
readFileOfPrimitives(path, Class) / PrimitiveInputFormat 跟readCsvFile类似,只不过以原⽣类型返回⽽不是Tuple。 读取SequenceFile,以Tuple2<Key, Value>返回
基于集合数据源:
fromCollection(Collection)
fromCollection(Iterator,Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator,Class)
generateSequence(from, to)
通⽤数据源:
readFile(inputFormat, path)/ FileInputFormat
createInput(inputFormat)/ InputFormat
1.1、⽂件数据源
⼊门案例就是基于⽂件数据源,如果需要对⽂件夹进⾏递归,那么我们也可以使⽤参数来对⽂件夹下⾯的多级⽂件夹进⾏递归
import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment}
object BatchOperate {
def main(args: Array[String]): Unit ={
val inputPath ="D:\\"
val outPut ="D:\\data\\result2"
val configuration: Configuration =new Configuration()
configuration.setBoolean("umeration",true)
//获取程序⼊⼝类ExecutionEnvironment
val env = ExecutionEnvironment
val text = adTextFile(inputPath).withParameters(configuration)
//引⼊隐式转换
import org.apache.flink.api.scala._
val value: AggregateDataSet[(String, Int)]= text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
value.writeAsText("d:\\datas\\").setParallelism(1)
}
}
1.2、集合数据源
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object DataSetSource {
def main(args: Array[String]): Unit ={
//获取批量处理程序⼊⼝类ExecutionEnvironment
val environment: ExecutionEnvironment = ExecutionEnvironment
import org.apache.flink.api.scala._
//从集合当中创建dataSet
val myArray =Array("hello world","spark flink")
val collectionSet: DataSet[String]= environment.fromCollection(myArray)
val result: AggregateDataSet[(String, Int)]= collectionSet.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
result.setParallelism(1).print()
// result.writeAsText("c:\\HELLO.TXT")
}
}
1.3、Flink的dataSet connectors
⽂件系统connector
为了从⽂件系统读取数据,Flink内置了对以下⽂件系统的⽀持:
⽂件系统 Schema 备注
HDFS hdfs:// Hdfs⽂件系统
S3 s3:// 通过hadoop⽂件系统实现⽀持
MapR maprfs:// 需要⽤户添加jar
Alluxio alluxio:// 通过hadoop⽂件系统实现
注意:Flink允许⽤户使⽤实现org.apache.hadoop.fs.FileSystem接⼝的任何⽂件系统。例如S3、 Google Cloud Storage Connector for Hadoop、 Alluxio、 XtreemFS、 FTP等各种⽂件系统
Flink与Apache Hadoop MapReduce接⼝兼容,因此允许重⽤Hadoop MapReduce实现的代码:
使⽤Hadoop Writable data type
使⽤任何Hadoop InputFormat作为DataSource(flink内置HadoopInputFormat)
使⽤任何Hadoop OutputFormat作为DataSink(flink内置HadoopOutputFormat)
使⽤Hadoop Mapper作为FlatMapFunction
使⽤Hadoop Reducer作为GroupReduceFunction
2、dataSet的算⼦介绍
官⽹算⼦
2.1、dataSet的transformation算⼦
Map:输⼊⼀个元素,然后返回⼀个元素,中间可以做⼀些清洗转换等操作
FlatMap:输⼊⼀个元素,可以返回零个,⼀个或者多个元素
MapPartition:类似map,⼀次处理⼀个分区的数据【如果在进⾏map处理的时候需要获取第三⽅资源链接,建议使⽤MapPartition】Filter:过滤函数,对传⼊的数据进⾏判断,符合条件的数据会被留下
Reduce:对数据进⾏聚合操作,结合当前元素和上⼀次reduce返回的值进⾏聚合操作,然后返回⼀个新的值
Aggregate:sum、max、min等
Distinct:返回⼀个数据集中去重之后的元素,data.distinct()
Join:内连接
OuterJoin:外链接
(1)使⽤mapPartition将数据保存到数据库
第⼀步:导⼊mysql的jar包坐标
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
第⼆步:创建mysql数据库以及数据库表
/*!40101 SET NAMES utf8 */;
/*!40101 SET SQL_MODE=''*/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE/*!32312 IF NOT EXISTS*/`flink_db`/*!40100 DEFAULT CHARACTER SET utf8 */;
USE`flink_db`;
/
*Table structure for table `user` */
DROP TABLE IF EXISTS`user`;
CREATE TABLE`user`(
`id`int(10)NOT NULL AUTO_INCREMENT,
`name`varchar(32)DEFAULT NULL,
PRIMARY KEY(`id`)
)ENGINE=InnoDB AUTO_INCREMENT=4DEFAULT CHARSET=utf8;
第三步:代码开发
import java.sql.PreparedStatement
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object MapPartition2MySql {
def main(args: Array[String]): Unit ={
val environment: ExecutionEnvironment = ExecutionEnvironment
import org.apache.flink.api.scala._
val sourceDataset: DataSet[String]= environment.fromElements("1 zhangsan","2 lisi","3 wangwu")
sourceDataset.mapPartition(part =>{
Class.forName("sql.jdbc.Driver").newInstance()
val conn = java.Connection("jdbc:mysql://localhost:3306/flink_db","root","123456")
part.map(x =>{
val statement: PreparedStatement = conn.prepareStatement("insert into user (id,name) values(?,?)")
statement.setInt(1, x.split(" ")(0).toInt)
statement.setString(2, x.split(" ")(1))
mysql下载jar包
})
}).print()
}
}
(2)连接操作
左外连接,右外连接,满外连接等算⼦的操作可以实现对两个dataset进⾏join操作,按照我们指定的条件进⾏join
object BatchDemoOuterJoinScala {
def main(args: Array[String]): Unit ={
val env = ExecutionEnvironment
import org.apache.flink.api.scala._
val data1 = ListBuffer[Tuple2[Int,String]]()
data1.append((1,"zs"))
data1.append((2,"ls"))
data1.append((3,"ww"))
val data2 = ListBuffer[Tuple2[Int,String]]()
data2.append((1,"beijing"))
data2.append((2,"shanghai"))
data2.append((4,"guangzhou"))
val text1 = env.fromCollection(data1)
val text2 = env.fromCollection(data2)
text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}).print()
println("===============================")
text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
if(first==null){
(second._1,"null",second._2)
}else{
(first._1,first._2,second._2)
}
}).print()
println("===============================")
text1.fullOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
if(first==null){
(second._1,"null",second._2)
}else if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}).print()
}
}
2.2、dataSet的partition算⼦
Rebalance:对数据集进⾏再平衡,重分区,消除数据倾斜
Hash-Partition:根据指定key的哈希值对数据集进⾏分区.partitionByHash()
Range-Partition:根据指定的key对数据集进⾏范围分区 .partitionByRange()
Custom Partitioning:⾃定义分区规则,⾃定义分区需要实现Partitioner接⼝partitionCustom(partitioner, “someKey”)或者partitionCustom(partitioner, 0)
在flink批量处理当中,分区算⼦主要涉及到rebalance,partitionByHash
,partitionByRange以及partitionCustom来进⾏分区
object FlinkPartition {
def main(args: Array[String]): Unit ={
val environment: ExecutionEnvironment = ExecutionEnvironment
environment.setParallelism(2)
import org.apache.flink.api.scala._
val sourceDataSet: DataSet[String]= environment.fromElements("hello world","spark flink","hive sqoop")
val filterSet: DataSet[String]= sourceDataSet.filter(x => x.contains("hello"))
.rebalance()
filterSet.print()
}
}
⾃定义分区来实现数据分区操作
第⼀步:⾃定义分区scala的class类
import org.apache.flink.apimon.functions.Partitioner
class MyPartitioner2  extends Partitioner[String]{
override def partition(word: String, num: Int): Int ={
println("分区个数为"+  num)
ains("hello")){
println("0号分区")
}else{
println("1号分区")
1
}
}
}
第⼆步:代码实现
import org.apache.flink.api.scala.ExecutionEnvironment
object FlinkCustomerPartition {
def main(args: Array[String]): Unit ={
val environment: ExecutionEnvironment = ExecutionEnvironment
//设置我们的分区数,如果不设置,默认使⽤CPU核数作为分区个数
environment.setParallelism(2)
import  org.apache.flink.api.scala._
//获取dataset
val sourceDataSet: DataSet[String]= environment.fromElements("hello world","spark flink","hello world","hive hadoop")    val result: DataSet[String]= sourceDataSet.partitionCustom(new MyPartitioner2,x => x +"")
val value: DataSet[String]= result.map(x =>{
println("数据的key为"+ x +"线程为"+ Thread.currentThread().getId)
x
})
value.print()
}
}
2.3、dataSet的sink算⼦

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。