⼩计--------flume⾃定义kafkasource案例
⾃定义source
⾃定义的消息有两种类型的Source,PollableSource (轮训拉取)与EventDrivenSource (事件驱动),两者的区别在于PollableSource是通过线程不断去调⽤process⽅法,主动拉取消息,⽽EventDrivenSource是需要触发⼀个调⽤机制,即被动等待。在利⽤PollableSource实现⾃定义Source时还需要实现Configurable接⼝,以便在项⽬中初始化某些配置⽤的,定义的Source如下:
1/**
2* EVM告警信息表数据采集
3* PollableSource 轮询拉取数据
4* 通过线程不断调⽤process⽅法,主动拉取消息
5*/
6public class Ev_alarm_Source extends AbstractSource implements Configurable, PollableSource {
7
8
9private static String topic = "";
10private static String groupId = "";
11private KafkaConsumer<String, String> consumer = null;
12private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
13 @Override
14public void configure(Context context) {
15 }
16
17
18/**
19 * 该⽅法会由PollableSource线程去不断调⽤执⾏
20 * 执⾏完后通过getChannelProcess的processEvent⽅法将kafka数据转换为flume的Event发送到channel
21 * @return
22 * @throws EventDeliveryException
23*/
24 @Override
25public Status process() throws EventDeliveryException {
26try{
27 ConsumerRecords<String, String> records = consumer.poll(100);
28for (ConsumerRecord<String, String> record : records){
29
30
31try {
32// 存放解析好后的字符串
33 HashMap<String, String> header = new HashMap<String, String>();
34//解析
35 String value = record.value();
36// 通过ali的fastJson解析kafka中的json串
37 JSONObject jsonObject = JSONObject.parseObject(value);
38
39
40 String msgid = String("msgid");
41 String vin = String("vin");
42 Long tm_c = String("tm_c") == null ? System.currentTimeMillis()/Long("tm_c");
43 Long tm_u = String("tm_u") == null ? System.currentTimeMillis()/Long("tm_u");
44 String t_flg = String("t_flg");
45 String ha_level = String("ha_level");
46 String ca_flg = String("ca_flg");
47 String sef_va = String("sef_va");
48 String dmf_va = String("dmf_va");
49 String egf_va = String("egf_va");
50 String of_va = String("of_va");
51 String lng = String("lng");
52 String lat = String("lat");
53
54
55// 拼接字段⽤#号进⾏连接不同字段
56 StringBuilder sb = new StringBuilder("");
57 sb.append(msgid);
58 sb.append("#");
59 sb.append(vin);
60 sb.append("#");
61 sb.append(sdf.format(new Date(tm_c*1000)));
62 sb.append("#");
63 sb.append(sdf.format(new Date(tm_u*1000)));
64 sb.append("#");
65 sb.append(t_flg);
66 sb.append("#");
67 sb.append(ha_level);
68 sb.append("#");
69 sb.append(ca_flg);
70 sb.append("#");
71 sb.append(sef_va);
72 sb.append("#");
73 sb.append(dmf_va);
74 sb.append("#");
75 sb.append(egf_va);
76 sb.append("#");
77 sb.append(of_va);
78 sb.append("#");
79 sb.append(lng);
80 sb.append("#");
81 sb.append(lat);
82
83
84// 封装到hashmap
85 header.put("timestamp", String.valueOf(tm_c * 1000));
86//将hashMap转换成flume的event发送到channel
88 } catch (Exception e) {
89 e.printStackTrace();
90 }
91
92
93 }
94 }catch (Exception e){
95 e.printStackTrace();
96 }
97// 返回Event的状态
98return Status.READY;
99 }
100
101
102public long getBackOffSleepIncrement() {
103return0;
104 }
105
106
107public long getMaxBackOffSleepInterval() {
108return0;
109 }
110
111
112/**
113 * 释放资源或清空字段
114*/
115 @Override
116public synchronized void stop() {
117if(consumer != null ){
118 consumer.close();//释放资源
119 }
120 }
121
122
123/**
124 * 在执⾏process() ⽅法执⾏会⾸先执⾏⼀次start()
125 * 进⾏初始化配置
126*/
127 @Override
128public synchronized void start() {
129try{
130// 设置kafka topic
132// 设置kafka groupID
134// 设置kafka broker
135 String kafka_broker_list = Property(Conf.KAFKA_BROKER_LIST);
136if(consumer == null ){
139 }
140 }catch (Exception e){
141 e.printStackTrace();
142 }
143 }
144
145
146public static void main(String[] args){
147
148
149 }
150 }
配置f ⽂件
1 #登出
2 a1.sources = r1
3 a1.sinks = k1
4 a1.channels = c1
5
6 # 指定Flume source(要监听的路径)⾃定义kafka⽣产者
7 a1.pe = com.sz.ly.bdp.l.ev_alarm.Ev_alarm_Source
8
9 # 指定Flume sink
10 a1.pe = hdfs
11 a1.sinks.k1.channel = c1
12 # 输出hdfs路径
13 a1.sinks.k1.hdfs.path = hdfs://nameservice1:8020/flume/evm/alarm/%Y%m%d
14 # ⽂本格式
15 a1.sinks.k1.hdfs.writeFormat = Text
16 # 输出⽂件后缀
17 a1.sinks.k1.hdfs.fileSuffix = .txt
18 # DataStream⽂件不压缩,不需要设置deC
19 a1.sinks.k1.hdfs.fileType = DataStream
20 # 每个批次刷新到hdfs上events数量
21 a1.sinks.k1.hdfs.batchSize = 100
22 # hdfs sink启动的操作HDFS的线程数
23 a1.sinks.k1.hdfs.threadsPoolSize = 10
24 # 默认值为0,当⽬前被打开的临时⽂件在该参数指定的时间(秒)内,没有任何数据写⼊,则将该临时⽂件关闭并重命名成⽬标⽂件
25 a1.sinks.k1.hdfs.idleTimeout = 0
26 # 默认值1024,当临时⽂件达到多少(单位:bytes)时,滚动成⽬标⽂件;如果设置成0,则表⽰不根据临时⽂件⼤⼩来滚动⽂件
27 a1.sinks.llSize = 268435456
kafka命令28 # 默认值:10,当events 数据达到该数量时候,将临时⽂件滚动成⽬标⽂件;如果设置成0 ,则表⽰不根据events数据来滚动⽂件
29 a1.sinks.llCount = 0
30 # 临时⽂件间隔60分钟滚动成最终⽂件; 单位: s
31 a1.sinks.llInterval = 3600
32 # 默认值:HDFS副本数,写⼊hdfs⽂件块的最⼩副本数。该参数会影响⽂件的滚动配置,⼀般将该参数配置成1,才可以按照配置正确滚动⽂件
33 a1.sinks.k1.hdfs.minBlockReplicas=1
34 a1.sinks.t-timeout=80000
35 # 默认值: 10000,执⾏HDFS操作的超时时间(单位:毫秒)
36 a1.sinks.k1.hdfs.callTimeout=120000
37 # 使⽤本机时间 %Y%m%d ,在替换转义序列时使⽤本地时间
38 a1.sinks.k1.hdfs.useLocalTimeStamp=true
39
40
41 # 指定Flume channel
42 a1.pe = memory
43 a1.channels.c1.capacity = 200000
44 a1.ansactionCapacity = 20000
45 a1.channels.c1.byteCapacityBufferPercentage = 20
46 a1.channels.c1.byteCapacity = 536870912
47
48 # 绑定source和sink到channel上
49 a1.sources.r1.channels = c1
50 a1.sinks.k1.channel = c1
部署集步骤:
1.⾸先将程序打包、并把jar包放在集flume-ng下的lib下。
2.将conf⽂件放到⼀个固定位置,在执⾏flume-ng命令的时候指定conf⽂件路径
3.执⾏flume-ng命令
#因为在环境变量中配置了flume环境,故不需要绝对路径
flume-ng agent -c conf -f /root/yzq/f -n a1 -logger=INFO,console 1 > /root/yzq/bdp-dc-flume/alarm.log 2>&1 &
4.启动kafka⽣产者进⾏模拟⽣产数据,查看hdfs是否有输出⽂件
kafka-console-producer --broker-list ip:端⼝ —topic 主题名
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论