flume对接kafka,模拟⽣产者实时⽣产数据
flume对接kafka,模拟⽣产者实时⽣产数据
引⾔
flume可以实时的监控⽇志,⽇志每增加⼀条,flume都会感知到,然后可以将这条新的数据传到kafka上,实际⽣产中,⽤户的每个⾏为,⽣成⼀条数据,存到⽇志或数据库中,然后使⽤flume来从⽇志中拉起数据。
任务:⽤shell脚本模拟⽤户⾏为每秒⼗条⽣成的数据存⼊到⽇志中,flume拉起⽇志中的数据传⼊kafka
已有数据:cmcc.json,⽬录⽇志:cmcc.log
,从cmcc.json1秒10条追加到cmcc.log
for line in`cat /root/log/cmcc.json`
do
`echo $line >> /root/log/cmcc.log`
sleep 0.1s
done
2.编写flume脚本
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.pe=exec
#监控的⽂件
agent.sources.s1mand=tail -F /root/log/cmcc.log
agent.sources.s1.channels=c1
agent.pe=memory
agent.channels.c1.capacity=10000
agent.ansactionCapacity=100
#设置⼀个kafka接收器
agent.pe= org.apache.flume.sink.kafka.KafkaSink
#设置kafka的broker地址和端⼝号(所有的)
agent.sinks.k1.brokerList=hadoop01:9092,hadoop02:9092,hadoop03:9092
#设置kafka的topic
agent.pic=cmcc2
#设置⼀个序列化⽅式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#组装
agent.sinks.k1.channel=c1
3.启动kafka
#启动kafka:
nohup bin/kafka-server-start.sh  config/server.properties &
#查看kafka的topic列表:
bin/kafka-topics.sh --list --zookeeper hadoop02:2181
#查看topic中的数据:
bin/kafka-console-consumer.sh --zookeeper hadoop02:2181 --from-beginning --topic cmcc
4.执⾏flume脚本
bin/flume-ng agent -c conf -f conf/flume_kafka.sh -n agent -logger=INFO,console
5.执⾏shell脚本
sh readcmcc.sh
6.kafka端查看
kafka为什么那么快#查看有没有⽣成⽬标topic
bin/kafka-topics.sh --list --zookeeper hadoop02:2181
#读取次topic中的数据
bin/kafka-console-consumer.sh --zookeeper hadoop02:2181 --from-beginning --topic cmcckafka

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。