⼤数据项⽬之⽇志数据采集(⼀)
⽇志数据采集
平台搭建模型设计
1、⽇志采集
⽅案选择
⽅案⼀:使⽤flume直接从⽇志服务器到hdfs
不能采⽤原因:
1、由于⽇志服务器较多,直接从⽇志服务器到HDFS,会导致HDFS的访问量过⾼,
bootstrap项目
2、由于flume采集到不同服务器上的同⼀时间段的⽇志,会写⼊到HDFS上同⼀个⽬录中,⽽同⼀⽂件的写⼊不⽀持多线程同时写
⼊。
⽅案⼆:使⽤flume聚合再传输给hdfs
此⽅案解决了⽅案⼀中多线程同时写⼊的问题。
不能采⽤原因:由于flume聚合,多个flume将会写⼊到⼀个flume中,末端的flume的传输负载较⼤,造成数据堆积,采集停⽌
⽅案三:使⽤flume–>kafka–>flume的⽅式
中间通过kafka集的缓冲,缓解了flume的负载,因此采⽤该⽅案。
第⼀层flume配置规划
flume读取本地⽇志服务器的数据,需要监控多⽬录中的⽂件的变化,所以source端采⽤taildir的⽅式,
⽅案⼀:memory channel + kafka sink
⽅案⼆:kafka channel
优势:⽆需经过kafka sink,传输速率更⾼
配置
为了后期对数据进⾏分析,需要考虑数据的格式问题,需要先将数据进⾏清洗,通过,将前端传输的⾮json格式的数据清洗实现
import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.Iterator;
import java.util.List;
/**
* @ClassName : LocalKfkInterceptor
* @Author : kele
* @Date: 2021/1/13 18:39
* @Description :配置本地数据到kafka时的,拦截飞json格式的数据
*
*/
public class LocalKfkInterceptor implements Interceptor {
@Override
public void initialize(){
}
/**
* 单个事件的,判断传输的数据格式是否是json的格式
* @param event
* @return
*/
@Override
public Event intercept(Event event){
String s =new Body());
try{
//如果不是json格式,则会抛异常,设置返回空值,否则返回本⾝
//通过是否有异常,决定是否删除改数据
JSON.parseObject(s);
return event;
}catch(Exception e){
return null;
}
}
@Override
public List<Event>intercept(List<Event> list){
Iterator<Event> it = list.iterator();
while(it.hasNext()){
Event event = it.next();
if(intercept(event)== null)
}
return list;
}
@Override
public void close(){
}
/**
* 需要静态内部类实现Builder
*/
public static class MyBuilder implements Builder{
@Override
public Interceptor build(){
return new LocalKfkInterceptor();
}
@Override
public void configure(Context context){
}
}
}
flume配置⽂件
#使⽤taildir source,kafka channel将监测的数据写⼊
a1.sources = r1
a1.channels = c1
#配置监控的⽅式,TAILDIR多⽬录监控,监控的⽬录中⽂件变化时才能检测到
a1.pe = TAILDIR
#设置监控组,⽤来实现多⽬录监控
a1.sources.r1.filegroups = f1
#设置监控的路径
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
#配置批次的⼤⼩
a1.sources.r1.batchSize = 100
#设置断点续传记录的位置保存的地址
a1.sources.r1.positionFile = /opt/module/flume/position.json
#设置
#将不是json格式传输的数据拦截
a1.sources.r1.interceptors = i1
#设置的类型,地址
a1.sources.r1.pe = com.atguigu.interce.LocalKfkInterceptor$MyBuilder
#配置kafka channel,
#channel类型,写⼊kafka channel的集、topic名称、是否以事件的⽅式传输(该配置需要与kafka source设置的类型⼀致) a1.pe = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.pic = first
a1.channels.c1.parseAsFlumeEvent =false
#source与channel的连接⽅式
a1.sources.r1.channels = c1
2、⽇志存储
channel的类型选择
⽅案⼀:MemoryChannel
MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适⽤于对数据质量要求不⾼的需求。
⽅案⼆:FileChannel
FileChannel传输速度相对于Memory慢,但数据安全保障⾼,Agent进程挂掉也可以从失败中恢复数据。
⽅案三:kafkaChannel
使⽤kafkachannel则不需要source,但由于需要,如果有没source则⽆法配置,(需要解决零点漂移问题)
配置
由于Flume默认会⽤Linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产⽣的。Flume消费Kafka⾥⾯的数据时,有可能已经是第⼆天了,那么这部门数据会被发往第⼆天的HDFS路径。我们希望的是根据⽇志⾥⾯的实际时间,发往HDFS的路径,所以下⾯作⽤是获取⽇志中的实际时间。
解决的思路:拦截json⽇志,通过fastjson框架解析json,获取实际时间ts。将获取的ts时间写⼊header头,header的key必须是timestamp,因为Flume框架会根据这个key的值识别为时间,写⼊到HDFS。
数据形式
获取ts字段
仿照默认的TimpStamp interceptor进⾏设置
官⽹Timestamp Interceptor介绍:
Timestamp Interceptor
This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp (or as  specified by the header property) whose value is the relevant timestamp. This interceptor can preserve an existing timestamp if it is already present in the configuration.
这个插⼊⼀个带有键时间戳的头(或由头属性指定的),它的值是timestamp 。如果配置中已经存在⼀个时间戳,这个可以保留它。
实现

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。