ES系列⼗⼋、FileBeat发送⽇志到logstash、ES、多个output过滤配置
⼀、FileBeat基本概念
简单概述
最近在了解ELK做⽇志采集相关的内容,这篇⽂章主要讲解通过filebeat来实现⽇志的收集。⽇志采集的⼯具有很多种,如fluentd, flume, logstash,betas等等。⾸先要知道为什么要使⽤filebeat呢?因为logstash是jvm跑的,资源消耗⽐较⼤,启动⼀个logstash就需要消耗500M左右的内存,⽽filebeat只需要10来M内存资源。常⽤的ELK⽇志采集⽅案中,⼤部分的做法就是将所有节点的⽇志内容通过filebeat送到kafka消息队列,然后使⽤logstash集读取消息队列内容,根据配置⽂件进⾏过滤。然后将过滤之后的⽂件输送到elasticsearch中,通过kibana去展⽰。
filebeat介绍
Filebeat由两个主要组成部分组成:prospector和 harvesters。这些组件⼀起⼯作来读取⽂件并将事件数据发送到您指定的output。
什么是harvesters?
harvesters负责读取单个⽂件的内容。harvesters逐⾏读取每个⽂件,并将内容发送到output中。每个⽂件都将启动⼀个harvesters。harvesters负责⽂件的打开和关闭,这意味着harvesters运⾏时,⽂件会保持打开状态。如果在收集过程中,即使删除了这个⽂件或者是对⽂件进⾏重命名,Filebeat依然会继续对这个⽂件进⾏读取,这时候将会⼀直占⽤着⽂件所对应的磁盘空间,直到Harvester关闭。默认情况下,Filebeat会⼀直保持⽂件的开启状态,直到超过配置的close_inactive参数,Filebeat才会把Harvester关闭。
关闭Harvesters会带来的影响:
file Handler将会被关闭,如果在Harvester关闭之前,读取的⽂件已经被删除或者重命名,这时候会释放之前被占⽤的磁盘资源。
当时间到达配置的scan_frequency参数,将会重新启动为⽂件内容的收集。linux怎么读取文件内容
如果在Havester关闭以后,移动或者删除了⽂件,Havester再次启动时,将会⽆法收集⽂件数据。
当需要关闭Harvester的时候,可以通过close_*配置项来控制。
什么是Prospector?
Prospector负责管理Harvsters,并且到所有需要进⾏读取的数据源。如果input type配置的是log类型,Prospector将会去配置度路径下查所有能匹配上的⽂件,然后为每⼀个⽂件创建⼀个Harvster。每个Prospector都运⾏在⾃⼰的Go routine⾥。
Filebeat⽬前⽀持两种Prospector类型:log和stdin。每个Prospector类型可以在配置⽂件定义多个。log Prospector将会检查每⼀个⽂件是否需要启动Harvster,启动的Harvster是否还在运⾏,或者是该⽂件是否被忽略(可以通过配置 ignore_order,进⾏⽂件忽略)。如果是在Filebeat运⾏过程中新创建的⽂件,只要在Harvster关闭后,⽂件⼤⼩发⽣了变化,新⽂件才会被Prospector选择到。
filebeat⼯作原理
Filebeat可以保持每个⽂件的状态,并且频繁地把⽂件状态从注册表⾥更新到磁盘。这⾥所说的⽂件状态是⽤来记录上⼀次Harvster读取⽂件时读取到的位置,以保证能把全部的⽇志数据都读取出来,然后发送给output。如果在某⼀时刻,作为output的ElasticSearch或者Logstash变成了不可⽤,Filebeat将会把最后的⽂件读取位置保存下来,直到output重新可⽤的时候,快速地恢复⽂件数据的读取。在Filebaet运⾏过程中,每个Prospector的状态信息都会保存在内存⾥。如果Filebeat出⾏了重启,完成重启之后,会从注册表⽂件⾥恢复重启之前的状态信息,让FIlebeat继续从之前已知的位置开始进⾏数据读取。
Prospector会为每⼀个到的⽂件保持状态信息。因为⽂件可以进⾏重命名或者是更改路径,所以⽂件
名和路径不⾜以⽤来识别⽂件。对于Filebeat来说,都是通过实现存储的唯⼀标识符来判断⽂件是否之前已经被采集过。
如果在你的使⽤场景中,每天会产⽣⼤量的新⽂件,你将会发现Filebeat的注册表⽂件会变得⾮常⼤。这个时候,你可以参考(),来解决这个问题。
⼆、下载FileBeat安装包
wget /downloads/beats/filebeat/filebeat-6.3.1-linux-x86_
三、FileBeat发送⽇志到ES
1、解压⽂件
tar -zxvf filebeat-6.3.1-linux-x86_
2、编辑l
l
按照要求修改输⼊和输出部分为(红⾊):
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
/home/log/*.log
#- c:\programdata\elasticsearch\logs\*
。
。。
#============================= Filebeat modules ===============================
# Glob pattern for configuration loading
path: ${fig}/modules.d/*.yml
# Set to true to enable config reloading
。。。
#============================= Filebeat modules ===============================
# Glob pattern for configuration loading
path: ${fig}/modules.d/*.yml
# Set to true to enable config reloading
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
3、启动
./filebeat -e -l -d "Publish"
4、验证
上传⽇志到⽂件到指定⽬录
⽇志内容:
{"@timestamp":"2018-09-20T01:21:02.363+08:00","@version":1,"message":"测试⽇志修改索引看看","logger_name":"ample.demo.DemoApplicationTests","thread_name":"main","level":"INFO","level_value":20000,"appName": {"@timestamp":"2018-09-20T01:21:02.364+08:00","@version":1,"message":"查询所有学⽣,pageNo1,pageSize1","logger_name":"ample.service.StudentService","thread_name":"main","level":"INFO","level_value":20000,"appName {"@timestamp":"2018-09-20T01:21:02.622+08:00","@version":1,"message":"Student(id=1, name=⼩明, classname=112, age=21, telphone=2147483647, nickName=null)","logger_name":"ample.demo.DemoApplicationTests", 5、kibana查看
四、FileBeat发送⽇志到Logstash,由logstash发送到ES
1、fileBeat配置
vim /home/filebeat-6.3.1-linux-x86_l
(只改红⾊部分其他跟上⾯配置⼀致):
#output.elasticsearch: 关闭ES配置
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
output.logstash:
# The Logstash hosts
hosts: ["localhost:5044"]
2、配置Logstash
vim /home/logstash-6.3.1/config/conf.f
添加配置:
input {
beats {
port => 5044
ssl => false
codec => json #格式化成json,否则下⾯%{appname}取不到值
}
}
output {
elasticsearch {
#action => "index"
hosts => ["localhost:9200"]
index => "%{appname}-%{+YYYY.MM.dd}" #根据项⽬名称动态创建索引
template => "/home/elasticsearch-6.3.1/config/templates/logstash.json" 索引模板地址
manage_template => false #关闭logstash默认索引模板
template_name => "crawl" #映射模板的名字
template_overwrite => true #如果设置为true,模板名字⼀样的时候,新的模板会覆盖旧的模板
}
}
3、启动logstash和filebeat
/home/logstash-6.3.1/bin/logstash --path.settings /home/logstash-6.3.1/config/ -f /home/logstash-6.3.1/config/conf.f &
/home/filebeat-6.3.1-linux-x86_64/filebeat -e -l -d "Publish" &
4、验证
拷贝⽇志⽂件ELK-2018-09-20.log到/home/log⽂件下
内容如下:
{"@timestamp":"2018-09-20T01:56:55.293+08:00","@version":1,"message":"今天是中秋节放假111,pageNo1,pageSize1","logger_name":"ample.service.StudentService","thread_name":"main","level":"INFO","level_value":20000 5、打开kibana
五、logstash多个output配置
1、修改配置⽂件、
input {
tcp {
port => 10514
codec => "json"
}
}
input {
beats {
port => 5044
ssl => false
codec => json
}
}
output {
elasticsearch {
#action => "index"
hosts => ["localhost:9200"]
index => "%{appname}-%{+YYYY.MM.dd}"
template => "/home/elasticsearch-6.3.1/config/templates/logstash.json"
manage_template => false #关闭logstash⾃动管理模板功能
template_name => "crawl" #映射模板的名字
template_overwrite => true
}
if [level] == "ERROR" {
elasticsearch {
#action => "index"
hosts => ["localhost:9200"]
index => "%{appname}-error-%{+YYYY.MM.dd}"
template => "/home/elasticsearch-6.3.1/config/templates/logstash.json"
manage_template => false #关闭logstash⾃动管理模板功能
template_name => "crawl" #映射模板的名字
template_overwrite => true
}
}
}
output {
stdout {
codec => rubydebug
}
}
打开kibana另外⼀个索引中只有errorr⽇志
六、logback⽣成ELK⽇志中⽂乱码问题
⾃定义json过滤器
<!-- 输出到ELK⽂件 -->
<appender name="elkLog"
class="ch.olling.RollingFileAppender">
<file>${LOGPATH}${file.separator}ELK-${TIMESTAMP}.log</file>
<append>true</append>
<encoder charset="UTF-8"class="net.der.LogstashEncoder" >
<jsonFactoryDecorator class="ample.logback.MyJsonFactoryDecorator" />
<customFields>{"appname":"${appName}"}</customFields>
</encoder>
<rollingPolicy class="ch.olling.TimeBasedRollingPolicy">
<fileNamePattern>${LOGPATH}${file.separator}all${file.separator}%d{yyyy-MM-dd}.log</fileNamePattern> <maxHistory>30</maxHistory>
</rollingPolicy>
<triggeringPolicy class="ch.olling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender>
java类
ample.logback;
import com.JsonGenerator;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import net.logstash.logback.decorate.JsonFactoryDecorator;
public class MyJsonFactoryDecorator implements JsonFactoryDecorator {
@Override
public MappingJsonFactory decorate(MappingJsonFactory factory) {
// 禁⽤对⾮ascii码进⾏escape编码的特性
factory.disable(JsonGenerator.Feature.ESCAPE_NON_ASCII);
return factory;
}
}
七、logstash+elasticsearch配置索引模板
在使⽤logstash收集⽇志的时候,我们⼀般会使⽤logstash⾃带的动态索引模板,虽然⽆须我们做任何定制操作,就能把我们的⽇志数据推送到elasticsearch索引集中,但是在我们查询的时候,就会发现,默认的索引模板常常把我们不需要分词的字段,给分词了,这样以来,我们的⽐较重要的聚合统计就不准确了:
如果使⽤的是logstash的默认模板,它会按-切分机器名,这样以来想统计那台机器上的收集⽇志最多就有问题了,所以这时候,就需要我们⾃定义⼀些索引模板了:
在logstash与elasticsearch集成的时候,总共有如下⼏种使⽤模板的⽅式:
(1)使⽤默认⾃带的索引模板,⼤部分的字段都会分词,适合开发和时候快速验证使⽤
(2)在logstash收集端⾃定义配置模板,因为分散在收集机器上,维护⽐较⿇烦
(3)在elasticsearc服务端⾃定义配置模板,由elasticsearch负责加载模板,可动态更改,全局⽣效,维护⽐较容易
以上⼏种⽅式:
使⽤第⼀种,最简单,⽆须任何配置
使⽤第⼆种,适合⼩规模集的⽇志收集,需要在logstash的output插件中使⽤template指定本机器上的⼀个模板json路径,例如 template => "/tmp/logstash.json"
使⽤第三种,适合⼤规模集的⽇志收集,如何配置,主要配置logstash的output插件中两个参数:
manage_template => false//关闭logstash⾃动管理模板功能
template_name => "crawl"//映射模板的名字
如果使⽤了,第三种需要在elasticsearch的集中的config/templates路径下配置模板json,在elasticsearch中索引模板可分为两种:
1、静态模板
适合索引字段数据固定的场景,⼀旦配置完成,不能向⾥⾯加⼊多余的字段,否则会报错
优点:scheam已知,业务场景明确,不容易出现因字段随便映射从⽽造成元数据撑爆es内存,从⽽导致es集全部宕机
缺点:字段数多的情况下配置稍繁琐
⼀个静态索引模板配置例⼦如下:
{
"crawl" : {
"template": "crawl-*",
"settings": {
"index.number_of_shards": 3,
"number_of_replicas": 0
},
"mappings" : {
"logs" : {
"properties" : {
"@timestamp" : {
"type" : "date",
"format" : "dateOptionalTime",
"doc_values" : true
},
"@version" : {
"type" : "string",
"index" : "not_analyzed",
"doc_values" : true
},
"cid" : {
"type" : "string",
"index" : "not_analyzed"
},
"crow" : {
"type" : "string",
"index" : "not_analyzed"
},
"erow" : {
"type" : "string",
"index" : "not_analyzed"
},
"host" : {
"type" : "string",
"index" : "not_analyzed"
},
"httpcode" : {
"type" : "string",
"index" : "not_analyzed"
},
"message" : {
"type" : "string"
},
"path" : {
"type" : "string"
},
"pcode" : {
"type" : "string",
"index" : "not_analyzed"
},
"pro" : {
"type" : "string",
"index" : "not_analyzed"
},
"ptype" : {
"type" : "string",
"index" : "not_analyzed"
},
"save" : {
"type" : "string",
"index" : "not_analyzed"
},
"t1" : {
"type" : "string",
"index" : "not_analyzed"
},
"t2" : {
"type" : "string",
"index" : "not_analyzed"
},
"t3" : {
"type" : "string",
"index" : "not_analyzed"
},
"url" : {
"type" : "string"
}
}
}
}
}
}
2、动态模板
适合字段数不明确,⼤量字段的配置类型相同的场景,多加字段不会报错
优点:可动态添加任意字段,⽆须改动scheaml,
缺点:如果添加的字段⾮常多,有可能造成es集宕机
如下的⼀个logstash的动态索引模板,只设置message字段分词,其他的字段默认不分词{
"template" : "crawl-*",
"settings" : {
"index.number_of_shards": 5,
"number_of_replicas": 0
},
"mappings" : {
"_default_" : {
"_all" : {"enabled" : true, "omit_norms" : true},
"dynamic_templates" : [ {
"message_field" : {
"match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "string", "index" : "analyzed", "omit_norms" : true,
"fielddata" : { "format" : "disabled" }
}
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "string", "index" : "not_analyzed", "doc_values" : true
}
}
} ],
"properties" : {
"@timestamp": { "type": "date" },
"@version": { "type": "string", "index": "not_analyzed" },
"geoip" : {
"dynamic": true,
"properties" : {
"ip": { "type": "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "float" },
"longitude" : { "type" : "float" }
}
}
}
}
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论