ELK收集nginx日志

一、架构图:

ELK收集nginx日志

二、日志收集方式:

ELK收集日志常用的有两种方式,分别是:
(1)不修改源日志的格式,而是通过logstash的grok方式进行过滤、清洗,将原始无规则的日志转换为规则的日志。
(2)修改源日志输出格式,按照需要的日志格式输出规则日志,logstash只负责日志的收集和传输,不对日志做任何的过滤清洗。
这两种方式各有优缺点:
第一种方式不用修改原始日志输出格式,直接通过logstash的grok方式进行过滤分析,好处是对线上业务系统无任何影响,缺点是logstash的grok方式在高压力情况下会成为性能瓶颈,如果要分析的日志量超大时,日志过滤分析可能阻塞正常的日志输出。因此,在使用logstash时,能不用grok的,尽量不使用grok过滤功能。
第二种方式缺点是需要事先定义好日志的输出格式,这可能有一定工作量,但优点更明显,因为已经定义好了需要的日志输出格式,logstash只负责日志的收集和传输,这样就大大减轻了logstash的负担,可以更高效的收集和传输日志。另外,目前常见的web服务器,例如apache、nginx等都支持自定义日志输出格式。因此,在企业实际应用中,第二种方式是首选方案。

三、配置filebeat

filebeat是安装在Nginx服务器上:

filebeat.inputs:
? - type: log
? enabled: true
? paths:
? - /var/log/nginx/access.log
? fields:
? log_topic: nginxlogs
? filebeat.config.modules:
? path: ${path.config}/modules.d/*.yml
? reload.enabled: false
? name: 172.16.213.157
? output.kafka:
? enabled: true
? hosts: ["172.16.213.51:9092", "172.16.213.75:9092", "172.16.213.109:9092"]
? version: "0.10"
? topic: ‘%{[fields.log_topic]}‘
? partition.round_robin:
? reachable_only: true
? worker: 2
? required_acks: 1
? compression: gzip
? max_message_bytes: 10000000
? logging.level: debug
? 这个配置文件中,是将Nginx的访问日志/var/log/nginx/access.log内容实时的发送到kafka集群topic为nginxlogs中。需要注意的是filebeat输出日志到kafka中配置文件的写法。
? 配置完成后,启动filebeat即可:
? [root@filebeatserver ~]# cd /usr/local/filebeat
? [root@filebeatserver filebeat]# nohup ./filebeat -e -c filebeat.yml &
? 启动完成后,可查看filebeat的启动日志,观察启动是否正常。

?

四、配置logstash

由于在Nginx输出日志中已经定义好了日志格式,因此在logstash中就不需要对日志进行过滤和分析操作了,下面直接给出logstash事件配置文件kafka_nginx_into_es.conf的内容:

input {
? kafka {
? bootstrap_servers => "172.16.213.51:9092,172.16.213.75:9092,172.16.213.109:9092"
? topics =>"nginxlogs" #指定输入源中需要从哪个topic中读取数据,这里会自动新建一个名为nginxlogs的topic
? group_id => "logstash"
? codec => json {
? charset => "UTF-8"
? }
? add_field => { "[@metadata][myid]" => "nginxaccess-log" } #增加一个字段,用于标识和判断,在output输出中会用到。
? }
? }
? ?
? filter {
? if [@metadata][myid] == "nginxaccess-log" {
? mutate {
? gsub => ["message", "\\x", "\\\x"] #这里的message就是message字段,也就是日志的内容。这个插件的作用是将message字段内容中UTF-8单字节编码做替换处理,这是为了应对URL有中文出现的情况。
? }
? if ( ‘method":"HEAD‘ in [message] ) { #如果message字段中有HEAD请求,就删除此条信息。
? drop {}
? }
? json {
? source => "message"
? remove_field => "prospector"
? remove_field => "beat"
? remove_field => "source"
? remove_field => "input"
? remove_field => "offset"
? remove_field => "fields"
? remove_field => "host"
? remove_field => "@version“
? remove_field => "message"
? }
? }
? }
? output {
? if [@metadata][myid] == "nginxaccess-log" {
? elasticsearch {
? hosts => ["172.16.213.37:9200","172.16.213.77:9200","172.16.213.78:9200"]
? index => "logstash_nginxlogs-%{+YYYY.MM.dd}" #指定Nginx日志在elasticsearch中索引的名称,这个名称会在Kibana中用到。索引的名称推荐以logstash开头,后面跟上索引标识和时间。
? }
? }
? }

这个logstash事件配置文件非常简单,没对日志格式或逻辑做任何特殊处理,由于整个配置文件跟elk收集apache日志的配置文件基本相同,因此不再做过多介绍。所有配置完成后,就可以启动logstash了,执行如下命令:
[root@logstashserver ~]# cd /usr/local/logstash
[root@logstashserver logstash]# nohup bin/logstash -f kafka_nginx_into_es.conf &
logstash启动后,可以通过查看logstash日志,观察是否启动正常,如果启动失败,会在日志中有启动失败提示。

五、配置Kibana

Filebeat从nginx上收集数据到kafka,然后logstash从kafka拉取数据,如果数据能够正确发送到elasticsearch,我们就可以在Kibana中配置索引了。

登录Kibana,首先配置一个index_pattern,点击kibana左侧导航中的Management菜单,然后选择右侧的Index Patterns按钮,最后点击左上角的Create index pattern,开始创建一个index pattern,如下图所示:

ELK收集nginx日志

ELK收集nginx日志

ELK收集nginx日志

至此,ELK收集Nginx日志的配置工作完成

上一篇:[转]创建数据库用户


下一篇:什么是Vue