Logstash——解析各类⽇志⽂件
原理
使⽤filebeat来上传⽇志数据,logstash进⾏⽇志收集与处理,elasticsearch作为⽇志存储与搜索引擎,最后使⽤kibana展现⽇志的可视化输出。所以不难发现,⽇志解析主要还是logstash做的事情。
从上图中可以看到,logstash主要包含三⼤模块:
1. INPUTS: 收集所有数据源的⽇志数据([源有file、redis、beats等,filebeat就是使⽤了beats源*);
2. FILTERS: 解析、整理⽇志数据(本⽂重点);
3. OUTPUTS: 将解析的⽇志数据输出⾄存储器([elasticseach、file、syslog等);
FILTERS是重点,来看看它常⽤到的⼏个插件:
1. grok:采⽤正则的⽅式,解析原始⽇志格式,使其结构化;
2. geoip:根据IP字段,解析出对应的地理位置、经纬度等;
3. date:解析选定时间字段,将其时间作为logstash每条记录产⽣的时间(若没有指定该字段,默认使⽤read line的时间作为该条记录时间);
*注意:codec也是经常会使⽤到的,它主要作⽤在INPUTS和OUTPUTS中,[提供有json的格式转换、multiline的多⾏⽇志合并等
配置⽂件
⼀个简单的配置⽂件:
input {
log4j {
port => "5400"
}
beats {
port => "5044"
}
}
filter { # 多个过滤器会按声明的先后顺序执⾏
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
action => "index"
hosts => "127.0.0.1:9200" # 或者 ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] ,⽀持均衡的写⼊ES的多个节点,⼀般为⾮master节点
index => "logstash-%{+YYYY-MM}"
}
stdout {
codec=> rubydebug
}
file {
path => "/path/to/target/file"
}
}
场景
1. NodeJS ⽇志
⽇志格式
$time - $remote_addr $log_level $path - $msg
⽇志内容
2017-03-15 18:34:14.535 - 112.65.171.98 INFO /root/ws/socketIo.js - xxxxxx与ws server断开连接
filebeat配置(建议filebeat使⽤rpm安装,以systemctl start filebeat⽅式启动)
filebeat:
prospectors:
- document_type: nodejs #申明type字段为nodejs,默认为log
paths:
- /var/log/nodejs/log #⽇志⽂件地址
input_type: log #从⽂件中读取
tail_files: true #以⽂件末尾开始读取数据
output:
logstash:
hosts: ["${LOGSTASH_IP}:5044"]
#General Setting
name: "server1" #设置beat的名称,默认为主机hostname
logstash中FILTERS配置
filter {
if [type] == "nodejs" { #根据filebeat中设置的type字段,来过滤不同的解析规则
grok{
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} - %{IPORHOST:clientip} %{LOGLEVEL:level} %{PATH:path} - %{GREEDYDATA:msg}" }
}
geoip {
source => "clientip" #填写IP字段
}
}
}
结果(为⽅便演⽰,数据有删减)
Filter配置讲解
1. grok中的match内容:
1. key:表⽰所需解析的内容;
2. value:表⽰解析的匹配规则,提取出对应的字段;
3. 解析语法:%{正则模板:⾃定义字段},其中TIMESTAMP_ISO8601、IPORHOST等都是grok提供的正则模板;
2. geoip:通过分析IP值,产⽣IP对应的地理位置信息;
这⾥是否发现@timestamp与timestamp不⼀致,@timestamp表⽰该⽇志的读取时间,在elasticsearch中作为时间检索索引。下⾯讲解Nginx⽇志时,会去修正这⼀问题。
2. Nginx 访问⽇志
⽇志格式
$remote_addr - $remote_user [$time_local]
"$request" $status $body_bytes_sent "$http_referer"
"$http_user_agent" "$http_x_forwarded_for"
⽇志内容
112.65.171.98 - - [15/Mar/2017:18:18:06 +0800] "GET /index.html HTTP/1.1" 200 1150 "urdomain/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/53 filebeat中prospectors的配置
- document_type: nginx
paths:
- /var/log/nginx/access.log #⽇志⽂件地址
input_type: log #从⽂件中读取
tail_files: true #以⽂件末尾开始读取数据
logstash中FILTERS配置
filter {
if [type] == "nginx" {
grok{
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z", "ISO8601" ]
target => "@timestamp" #可省略
}
}
}
结果
Filter配置讲解
1. grok:
1. 是不是很不可思议,上⼀⽰例中我们匹配规则写了⼀长串,这个仅仅⼀个COMBINEDAPACHELOG就搞定了!
2. grok除了提供上⾯那种基础的正则规则,还对常⽤的⽇志(java,http,syslog等)提供的相应解析模板,本质还是那么⼀长串正则,[详情见grok的120中正则模板;
2. date:
1. match:数组中第⼀个值为要匹配的时间字段,后⾯的n个是匹配规则,它们的关系是or的关系,满⾜⼀个即可;
2. target:将match中匹配的时间替换该字段,默认替换@timestamp;
⽬前为⽌我们解析的都是单⾏的⽇志,向JAVA这样的,若果是多⾏的⽇志我们⼜该怎么做呢?
3. JAVA Log4j ⽇志
⽇志内容
'2017-03-16 15:52:39,580 ERROR TestController:26 - test:
java.lang.NullPointerException
sts(TestController.java:22)
flect.NativeMethodAccessorImpl.invoke0(Native Method)
flect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
flect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at flect.Method.invoke(Method.java:497)
at org.hod.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.hod.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137)'
filebeat中prospectors的配置
- document_type: tomcat
paths:
- /var/log/java/log #⽇志⽂件地址
input_type: log #从⽂件中读取
tail_files: true #以⽂件末尾开始读取数据
multiline:
pattern: ^\d{4}
match: after
negate: true
logstash中FILTERS配置
filter {
if [type] == "tomcat" {
grok{
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{JAVALOGMESSAGE:msg}" }
}
date {
match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,S", "ISO8601" ]
}
}
}
结果
Filebeat配置讲解
1. multiline 合并多⾏⽇志:
1. pattern:匹配规则,这⾥指匹配每条⽇志开始的年份;
2. match:有before与after,这⾥指从该⾏开始向后匹配;
3. negate:是否开始⼀个新记录,这⾥指当pattern匹配后,结束之前的记录,创建⼀条新⽇志记录;
当然在logstash input中使⽤codec multiline设置是⼀样的
⼩技巧:关于grok的正则匹配,官⽅有给出⽅法,在这上⾯提供了debugger、⾃动匹配等⼯具,⽅便⼤家编写匹配规则
正则匹配多行ES Output插件
主要的选项包括:
# action,默认是index,索引⽂档(logstash的事件)(ES架构与核⼼概念参考)。
# host,声明ES服务器地址端⼝
# index,事件写⼊的ES index,默认是logstash-%{+YYYY.MM.dd},按天分⽚index,⼀般来说我们会按照时间分⽚,时间格式参考/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html。详情请参考我的另外⼀篇⽂章:
参考:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论