Flink写HDFS之BucketingSink
Flink写HDFS,⽬前常⽤的有 BucketingSink, StreamingFileSink .
BucketingSink后续会被StreamingFileSink替代。不过功能实现都还是很强⼤的。
StreamingFileSink ⽀持⼀些BucketingSink不⽀持的特性,如S3, parquet格式写等等,
1 代码⽰例:
import java.io.{FileWriter, Writer}
import java.time.ZoneId
import java.util.Properties
import org.apache.flink.apimon.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.tors.fs.{SequenceFileWriter, StringWriter}
import org.apache.tors.fs.bucketing.{BucketingSink, DateTimeBucketer}
import org.apache.tors.kafka.FlinkKafkaConsumer
object BucketingHdfsSink {
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)
// set up execution environment
val env = ExecutionEnvironment
// make parameters available in the web interface
pending
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.139.13:9092,192.168.139.11:9092,192.168.139.12:9092")
properties.setProperty("group.id", "sink-group")
val stream = env
.addSource(new FlinkKafkaConsumer[String]("sink-topic", new SimpleStringSchema(), properties))
// 写⼊⽂件类型:String 也⽀持元组:Tuple2[IntWritable,Text]
val bucketingSink = new BucketingSink[String]("/tmp/flinkhdfs")
// 分桶:即创建⽂件⽬录。
/
/ DateTimeBucketer 只能根据时间指定⽂件名的滚动是规则,没办法根据数据指定⽂件的输出位置。可实现 BasePathBucketer ⾃定义输出路径如下例⼦DayBase ///**
// * 根据实际数据返回数据输出的路径,实现同DaeTimeBucketer功能类似的⾃定义的按天分桶器
// */
//class DayBasePathBucketer extends BasePathBucketer[String]{
//
// /**
// * 返回路径
// * @param clock
// * @param basePath
// * @param element
// * @return ,
// * 基本路径下创建分桶⽬录。⾃定义⽬录
// * 基本路径下创建分桶⽬录。⾃定义⽬录
// */
// override def getBucketPath(clock: Clock, basePath: Path, element: String): Path = {
// // yyyyMMdd
// val day = element.substring(1, 9)
// new Path(basePath + File.separator + day)
// }
//}
// 系统⾃带的分桶器 DateTimeBuckete
// 默认格式:yyyy-MM-dd--HH:按⼩时分桶。 yyyy-MM-dd:按天分桶 yyyy-MM-dd--HHmm,按分钟分桶。
bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd"))
// 写⼊⽂件类型:String 也⽀持元组:[IntWritable,Text]
// 默认写⼊使⽤StringWriter。也可以使⽤SequenceFileWriter
// bucketingSink.setWriter(new StringWriter())
// 重要:设置⽂件滚存⼤⼩,可以按SIZE和时间间隔⾃动⽣成新⽂件,两者任⼀条件满⾜,回⽣成新⽂件。
bucketingSink.setBatchSize(1024 * 100) // this is 400 MB, 500K
bucketingSink.setBatchRolloverInterval(1 * 60 * 1000); // this is 1 mins。按分钟重新⽣成⽂件(必须有数据才会刷新)。
// 设定不活动桶时间阈值,超过此值便关闭⽂件// 设定不活动桶时间阈值,超过此值便关闭⽂件 bucketingSink.setInactiveBucketThreshold(3 * 60 * 1000L) // 设定检查不活动桶的频率
bucketingSink.setInactiveBucketCheckInterval(30 * 1000L)
/
/invoke⽅法会调⽤每⼀条记录
//shouldRoll⽅法判断⽂件⼤⼩和滚动时间是否达到。是则创建新的⽂件,以inprogress结尾。
/************** onProcessingTime⽅法 ****************/
/* 此⽅法定时器触发(InactiveBucketCheckInterval),在设置的不活动桶检查频率检查。详见:closePartFilesByTime⽅法
检查1:⽂件最后修改时间与当前时间,是否超过InactiveBucketThreshold,是则关闭:closeCurrentPartFile
检查2:⽂件创建时间与当前时间,是否超过BatchRolloverInterval,是则关闭:closeCurrentPartFile
*/
// public void onProcessingTime(long timestamp) throws Exception {
// long currentProcessingTime = CurrentProcessingTime();
// closePartFilesByTime(currentProcessingTime);
// isterTimer(currentProcessingTime + inactiveBucketCheckInterval, this); // }
// sink.setInProgressPrefix("inProcessPre")
// sink.setPendingPrefix("pendingpre")
// sink.setPartPrefix("partPre")
stream.addSink(bucketingSink)
// closeCurrentPartFile逻辑,
// 1 ⽂件最开始保存在内存中,定时刷新缓存,后缀inprogress. ⽂件到达指定间隔时间(创建间隔或不活动间隔),关闭writer刷新缓存。将⽂件改名为pending结 // 注:如果checkpoint没完成,系统崩溃。则inprogress以及pending状态的⽂件废弃,以之前的offset重新⽣成⽂件。 // checkpoint完成,则将pending状态的⽂
}
}
3 运⾏⽰例:
3.1 查看分桶⽬录
[hadoop@cdh01 ~]$ hadoop fs -ls /tmp/flinkhdfs
Found 17 items
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:39 /tmp/flinkhdfs/2020-04-14
drwxr-xr-x - hadoop supergroup 0 2020-04-14 10:57 /tmp/flinkhdfs/2020-04-14--1055
drwxr-xr-x - hadoop supergroup 0 2020-04-14 10:57 /tmp/flinkhdfs/2020-04-14--1056
drwxr-xr-x - hadoop supergroup 0 2020-04-14 10:58 /tmp/flinkhdfs/2020-04-14--1057
drwxr-xr-x - hadoop supergroup 0 2020-04-14 10:59 /tmp/flinkhdfs/2020-04-14--1058
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:00 /tmp/flinkhdfs/2020-04-14--1059
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:01 /tmp/flinkhdfs/2020-04-14--1100
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:02 /tmp/flinkhdfs/2020-04-14--1101
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:03 /tmp/flinkhdfs/2020-04-14--1102
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:04 /tmp/flinkhdfs/2020-04-14--1103
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:05 /tmp/flinkhdfs/2020-04-14--1104 drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:06 /tmp/flinkhdfs/2020-04-14--1105
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:07 /tmp/flinkhdfs/2020-04-14--1106
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:08 /tmp/flinkhdfs/2020-04-14--1107
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:09 /tmp/flinkhdfs/2020-04-14--1108
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:09 /tmp/flinkhdfs/2020-04-14--1109
drwxr-xr-x - hadoop supergroup 0 2020-04-14 11:10 /tmp/flinkhdfs/2020-04-14--1110
3.2 查看桶内⽂件:(后缀会不时变动: inprogress-->pending-->移除后缀)
[hadoop@cdh01 ~]$ hadoop fs -ls /tmp/flinkhdfs/2020-04-14
Found 28 items
-rw-r--r-- 1 hadoop supergroup 102418 2020-04-14 11:12 /tmp/flinkhdfs/2020-04-14/part-0-0 -rw-r--r-- 1 hadoop supergroup 49736 2020-04-14 11:13 /tmp/flinkhdfs/2020-04-14/part-0-1 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:22 /tmp/flinkhdfs/2020-04-14/part-0-10 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:23 /tmp/flinkhdfs/2020-04-14/part-0-11 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:24 /tmp/flinkhdfs/2020-04-14/part-0-12 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:25 /tmp/flinkhdfs/2020-04-14/part-0-13 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:26 /tmp/flinkhdfs/2020-04-14/part-0-14 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:27 /tmp/flinkhdfs/2020-04-14/part-0-15 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:28 /tmp/flinkhdfs/2020-04-14/part-0-16 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:29 /tmp/flinkhdfs/2020-04-14/part-0-17 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:30 /tmp/flinkhdfs/2020-04-14/part-0-18 -rw-r--r-- 1 hadoop supergroup 49883 2020-04-14 11:31 /tmp/flinkhdfs/2020-04-14/part-0-19 -rw-r--r-- 1 hadoop supergroup 49966 2020-04-14 11:14 /tmp/flinkhdfs/2020-04-14/part-0-2 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:32 /tmp/flinkhdfs/2020-04-14/part-0-20 -rw-r--r-- 1 had
oop supergroup 49551 2020-04-14 11:33 /tmp/flinkhdfs/2020-04-14/part-0-21 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:34 /tmp/flinkhdfs/2020-04-14/part-0-22 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:35 /tmp/flinkhdfs/2020-04-14/part-0-23 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:36 /tmp/flinkhdfs/2020-04-14/part-0-24 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:37 /tmp/flinkhdfs/2020-04-14/part-0-25 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:38 /tmp/flinkhdfs/2020-04-14/part-0-26 -rw-r--r-- 1 hadoop supergroup 49468 2020-04-14 11:39 /tmp/flinkhdfs/2020-04-14/part-0-27 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:15 /tmp/flinkhdfs/2020-04-14/part-0-3 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:16 /tmp/flinkhdfs/2020-04-14/part-0-4 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:17 /tmp/flinkhdfs/2020-04-14/part-0-5 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:18 /tmp/flinkhdfs/2020-04-14/part-0-6 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:19 /tmp/flinkhdfs/2020-04-14/part-0-7 -rw-r--r-- 1 hadoop supergroup 49883 2020-04-14 11:20 /tmp/flinkhdfs/2020-04-14/part-0-8 -rw-r--r-- 1 hadoop supergroup 49551 2020-04-14 11:21 /tmp/flinkhdfs/2020-04-14/part-0-9
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论