SparkStreaming之Kafka偏移量管理
本⽂主要介绍 Spark Streaming 应⽤开发中消费 Kafka 消息的相关内容,⽂章着重突出了开发环境的配置以及⼿动管理 Kafka 偏移量的实现。
⼀、开发环境
1、组件版本
CDH 集版本:6.0.1
Spark 版本:2.2.0
Kafka 版本:1.0.1
2、Maven 依赖
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- spark 基础依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- spark-streaming 相关依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- zookeeper 相关依赖 -->
<dependency>
<groupId>keeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5-cdh6.0.1</version>
</dependency>
3、scala 编译
在 l 的 build 节点下的 plugins 中添加 scala 编译插件
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
Maven 打包语句:mvn clean scala:compile compile package
4、打包注意事项
由于 spark、spark-streaming、zookeeper 等均为⼤数据集中必备的组件,因此与之相关的依赖⽆需打包到最终的 jar 包中,可以将其 scope 设置为 provided 即可;否则最终的 jar 包会相当庞⼤。
⼆、Kafka 偏移量
1、偏移量(offset)
这⾥的偏移量是指 kafka consumer offset,在 Kafka 0.9 版本之前消费者偏移量默认被保存在 zookeeper 中
kafka最新版本(/consumers/<group.id>/offsets/<topic>/<partitionId>),因此在初始化消费者的时候需要指定 zookee
per.hosts。
随着 Kafka consumer 在实际场景的不断应⽤,社区发现旧版本 consumer 把位移提交到 ZooKeeper 的做法并不合适。ZooKeeper 本质上只是⼀个协调服务组件,它并不适合作为位移信息的存储组件,毕竟频繁⾼并发的读/写操作并不是 ZooKeeper 擅长的事情。因此在0.9 版本开始 consumer 将位移提交到 Kafka 的⼀个内部 topic(__consumer_offsets)中,该主题默认有 50 个分区,每个分区 3 个副本。
2、消息交付语义
at-most-once:最多⼀次,消息可能丢失,但不会被重复处理;
at-least-once:⾄少⼀次,消息不会丢失,但可能被处理多次;
exactly-once:精确⼀次,消息⼀定会被处理且只会被处理⼀次。
若 consumer 在消息消费之前就提交位移,那么便可以实现 at-most-once,因为若 consumer 在提交位移与消息消费之间崩溃,则consumer 重启后会从新的 offset 位置开始消费,前⾯的那条消息就丢失了;相反地,若提交位移在消息消费之后,则可实现 at-least-once 语义。由于 Kafka 没有办法保证这两步操作可以在同⼀个事务中完成,因此 Kafka 默认提供的就是 at-least-once 的处理语义。
3、offset 提交⽅式
默认情况下,consumer 是⾃动提交位移的,⾃动提交间隔是 5 秒,可以通过设置 automit.interval.ms 参数可以控制⾃动提交的间隔。⾃动位移提交的优势是降低了⽤户的开发成本使得⽤户不必亲⾃处理位移提交;劣势是⽤户不能细粒度地处理位移的提交,特别是在有较强的精确⼀次处理语义时(在这种情况下,⽤户可以使⽤⼿动位移提交)。
所谓的⼿动位移提交就是⽤户⾃⾏确定消息何时被真正处理完并可以提交位移,⽤户可以确保只有消息被真正处理完成后再提交位移。如果使⽤⾃动位移提交则⽆法保证这种时序性,因此在这种情况下必须使⽤⼿动提交位移。设置使⽤⼿动提交位移⾮常简单,仅仅需要在构建KafkaConsumer 时设置 enable.automit=false,然后调⽤ commitSync 或 commitAsync ⽅法即可。
三、使⽤ Zookeeper 管理 Kafka 偏移量
1、Zookeeper 管理偏移量的优势
虽然说新版 kafka 中已经⽆需使⽤ zookeeper 管理偏移量了,但是使⽤ zookeeper 管理偏移量相⽐ kafka ⾃⾏管理偏移量有如下⼏点好处:
1. 可以使⽤ zookeeper 管理⼯具轻松查看 offset 信息;
2. ⽆需修改 groupId 即可从头读取消息;
3. 特别情况下可以⼈为修改 offset 信息。
借助 zookeeper 管理⼯具可以对任何⼀个节点的信息进⾏修改、删除,如果希望从最开始读取消息,则只需要删除 zk 某个节点的数据即可。
2、Zookeeper 偏移量管理实现
import org.I0Itec.zkclient.ZkClient
import org.apache.sumer.ConsumerRecord
import org.apache.kafkamon.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.OffsetRange
llection.JavaConverters._
class ZkKafkaOffset(getClient:()=> ZkClient, getZkRoot :()=> String){
// 定义为 lazy 实现了懒汉式的单例模式,解决了序列化问题,⽅便使⽤ broadcast
lazy val zkClient: ZkClient =getClient()
lazy val zkRoot: String =getZkRoot()
// offsetId = md5(groupId+join(topics))
// 初始化偏移量的 zk 存储路径 zkRoot
def initOffset(offsetId: String): Unit ={
if(!ists(zkRoot)){
}
}
// 从 zkRoot 读取偏移量信息
def getOffset(): Map[TopicPartition, Long]={
val keys = Children(zkRoot)
var initOffsetMap: Map[TopicPartition, Long]=Map()
if(!keys.isEmpty){
for(k:String <- keys.asScala){
val ks = k.split("!")
val value:Long = adData(zkRoot +"/"+ k)
initOffsetMap +=(new TopicPartition(ks(0), Integer.parseInt(ks(1)))-> value)
}
}
initOffsetMap
}
// 根据单条消息,更新偏移量信息
def updateOffset(consumeRecord: ConsumerRecord[String, String]): Boolean ={
val path = zkRoot +"/"+ pic +"!"+ consumeRecord.partition
zkClient.writeData(path, consumeRecord.offset())
true
}
// 消费消息前,批量更新偏移量信息
def updateOffset(offsetRanges: Array[OffsetRange]): Boolean ={
for(offset: OffsetRange <- offsetRanges){
val path = zkRoot +"/"+ pic +"!"+ offset.partition if(!ists(path)){
else{
zkClient.writeData(path, offset.fromOffset)
}
}
true
}
// 消费消息后,批量提交偏移量信息
def commitOffset(offsetRanges: Array[OffsetRange]): Boolean ={ for(offset: OffsetRange <- offsetRanges){
val path = zkRoot +"/"+ pic +"!"+ offset.partition if(!ists(path)){
}
else{
zkClient.writeData(path, offset.untilOffset)
}
}
true
}
def finalize(): Unit ={
zkClient.close()
}
}
object ZkKafkaOffset{
def apply(cong: SparkConf, offsetId: String): ZkKafkaOffset ={
val getClient =()=>{
val zkHost = ("kafka.zk.hosts","127.0.0.1:2181") new ZkClient(zkHost,30000)
}
val getZkRoot =()=>{
val zkRoot ="/kafka/ss/offset/"+ offsetId
zkRoot
}
new ZkKafkaOffset(getClient, getZkRoot)
}
}
3、Spark Streaming 消费 Kafka 消息
llection.JavaConverters._
object RtDataLoader {
def main(args: Array[String]): Unit ={
// 从配置⽂件读取 kafka 配置信息
val props =new Props("xxx.properties")
val groupId = Str("groupId","")
if(StrUtil.isBlank(groupId)){
<("groupId is empty")
return
}
val kfkServers = Str("kfk_servers")
if(StrUtil.isBlank(kfkServers)){
<("bootstrap.servers is empty")
return
}
val topicStr = Str("topics")
if(StrUtil.isBlank(kfkServers)){
<("topics is empty")
return
}
/
/ KAFKA 配置设定
val topics = topicStr.split(",")
val kafkaConf = Map[String, Object](
"bootstrap.servers"-> kfkServers,
"key.deserializer"-> classOf[StringDeserializer],
"value.deserializer"-> classOf[StringDeserializer],
"group.id"-> groupId,
"receive.buffer.bytes"->(102400: java.lang.Integer),
"max.partition.fetch.bytes"->(5252880: java.lang.Integer),
"set"->"earliest",
"enable.automit"->(false: java.lang.Boolean)
)
val conf =new SparkConf().setAppName("ss-kafka").setIfMissing("spark.master","local[2]")
// streaming 相关配置
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
conf.set("spark.abled","true")
conf.set("spark.streaming.backpressure.initialRate","1000")
// 设置 zookeeper 连接信息
conf.set("kafka.zk.hosts", Str("zk_hosts","sky-01:2181"))
// 创建 StreamingContext
val sc =new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc =new StreamingContext(sc,Seconds(5))
// 根据 groupId 和 topics 获取 offset
val offsetId = SecureUtil.md5(groupId + topics.mkString(","))
val kafkaOffset =ZkKafkaOffset(Conf, offsetId)
kafkaOffset.initOffset(ssc, offsetId)
val customOffset: Map[TopicPartition, Long]= Offset(ssc)
// 创建数据流
var stream:InputDStream[ConsumerRecord[String, String]]= null
ains("*")){
StaticLog.warn("使⽤正则匹配读取 kafka 主题:"+ topicStr)
stream = ateDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.SubscribePattern[String, String](Patternpile(topicStr), kafkaConf, customOffset)) }
else{
StaticLog.warn("待读取的 kafka 主题:"+ topicStr)
stream = ateDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, customOffset))
}
// 消费数据
stream.foreachRDD(rdd =>{
// 消息消费前,更新 offset 信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaOffset.updateOffset(offsetRanges)
//region 处理详情数据
StaticLog.info("开始处理 RDD 数据!")
//endregion
// 消息消费结束,提交 offset 信息
kafkaOffsetmitOffset(offsetRanges)
})

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。