Spark基础(六):SparkStreaming实操读写kafka(亲测)⽂章⽬录
启动环境
#启动zookeeper
zkService.sh start
#启动kafka
kafka-server-shart.sh /opt/soft/kafka211/config/server.properties
配置sources⽂件flume-kafka
cd /opt/flumeconf
vi conf_08011_kafka.properties
#conf_08011_kafka.propertiess配置
a11.channels=c11
a11.sources=s11
a11.sinks=k11
a11.pe=spooldir
a11.sources.s11.spoolDir=/opt/retail_db-csv
a11.sources.s11.interceptors=head_filter
a11.sources.s11.interceptors.pe=regex_filter
a11.sources.s11.interceptors.=^user.*
a11.sources.s11.interceptors.ludeEvents=true
a11.sources.s11.deserializer.maxLineLength=60000
a11.pe=org.apache.flume.sink.kafka.KafkaSink
a11.sinks.k11.kafka.bootstrap.servers=192.168.56.120:9092
a11.sinks.pic=userfriedns
a11.pe=memory
a11.channels.c11.capacity=60000
a11.ansactionCapatity=60000
a11.sinks.k11.channel=c11
a11.sources.s11.channels=c11
创建Kafka-topics
kafka-topics.sh --create --zookeeper 192.168.56.129:2181 --topic mydemo --replication-factor 1 --partitions 1
flume导⼊kafka
flume-ng agent -n a11 -c conf -f /opt/flumeconf/conf_0807_kafka.properties
1、spark streaming向Kafka读数据
import org.apache.sumer.ConsumerConfig
import org.apache.kafkamon.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object MyReadKafkaHandler {
def main(args: Array[String]): Unit ={
val conf =new SparkConf().setMaster("local[*]").setAppName("name")
val sc =new SparkContext(conf)
//流处理的上下⽂类
val ssc =new StreamingContext(sc,Seconds(10))
/
/创建连接kafka服务参数
val kafkaParam =Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"192.168.56.120:9092",
ConsumerConfig.GROUP_ID_CONFIG->"mykafka14",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG->"true",
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG->"20000",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest"
)mysql下载add produce
//创建Direct流
val streams = ateDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("userfriedns"), kafkaParam))
//读取数据,并做简单处理
val value = streams.map(_.value).filter(_.split(",").size >1).flatMap(line =>{
val ids = line.split(",")
ids(1).split(" ").map(word =>(ids(0), word))
})
2、spark Streaming 向Kafka写数据,没有现成的接⼝,需要利⽤Kafka提供的底层接⼝对KafkaProduce包装,再⼴播到每个Eexcutor中
(1)对KafkaProducer的包装:
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaSinks[K,V](fc:()=>KafkaProducer[K,V])extends Serializable {
//避免运⾏时产⽣NotSerializableException异常
lazy val producer = fc()
def send(topic:String,key:K,value:V)={
//写⼊Kafka
producer.send(new ProducerRecord[K,V](topic,key,value))
}
//写⼊Kafka
def send(topic:String,value:V)={
producer.send(new ProducerRecord[K,V](topic,value))
}
}
object KafkaSinks{
//导⼊Scala Java ⾃动类型互转换
import  llection.JavaConversions._
//此处Map为llection.Map=>java.util.Map
def apply[K,V](conf:Map[String,String]): KafkaSinks[K,V]={
var func =()=>{
//新建KafkaProducer
//llection.Map => java.util.Map
val prod =new KafkaProducer[K,V](conf)//需要java.util.Map
//虚拟机JVM退出时执⾏函数
sys.addShutdownHook{
//确保在Executor的JVM关闭前,KafkaProducer将缓存中的所有信息写⼊Kafka
//close()会被阻塞直到之前所有发送的请求完成
prod.close()
}
prod
}
new KafkaSinks[K,V](func)
}
//隐式转换 java.util.Properties => llection.mutable.Map[String, String]
//再通过 Map => llection.immutable.Map
def apply[K,V](conf:Properties): KafkaSinks[K,V]= Map)
}
(2)对KafkaSink的惰性单例实现,避免在Worker中重复创建:
import org.apache.kafkamon.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
object MySingleBaseDAO {
/**
* 对对象先上锁 broadcast只有⼀个
*/
@volatile private var instance:Broadcast[KafkaSinks[String,String]]=null
/
/⼤家⼀起进来,但总有先进去(⼀开始都是null)
def getInstance()={
if(instance ==null){
//为了确定由谁有创建KafkaParms,不造成混乱
val sc = SparkSession.builder().appName("writerKafka")
.master("local[2]").getOrCreate().sparkContext
synchronized{
if(instance ==null){
val kafkaParms=Map[String,String](
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG->"192.168.56.120:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer].getName,            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer].getName )
instance = sc.broadcast(KafkaSinks[String,String](kafkaParms))
}
instance
}
}
instance
}
}
(3)分析结果增加消息写⼊Kafka的操作
import org.apache.kafkamon.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object MyReadKafkaHandler {
def main(args: Array[String]):Unit={
val conf =new SparkConf().setMaster("local[*]").setAppName("name")
val sc =new SparkContext(conf)
//流处理的上下⽂类
val ssc =new StreamingContext(sc,Seconds(10))
//创建连接kafka服务参数
val kafkaParam = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"192.168.56.120:9092",
ConsumerConfig.GROUP_ID_CONFIG->"mykafka14",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG->"true",
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG->"20000",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest"
)
//创建Direct流
val streams = ateDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Set("userfriedns"), kafkaParam))
//读取数据,并做简单处理
val value = streams.map(_.value).filter(_.split(",").size >1).flatMap(line =>{
val ids = line.split(",")
ids(1).split(" ").map(word =>(ids(0), word))
})
//    value.foreachRDD(rdd=>rdd.foreach(println))
//将处理的数据写回kafka
value.foreachRDD(rdd=>{
//获取可序列化
val producer = Instance().value
rdd.foreach(record=>{
producer.send("kafkaSink",String())
})
})
//启动sparkstreaming
ssc.start()
ssc.awaitTermination()
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0" xmlns:xsi="/2001/XMLSchema-instance"  xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.
xsd">
<modelVersion>4.0.0</modelVersion>

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。