filebeat收集IIS日志到es中

在工作中有需要收集IIS日志的需求时,可以使用filebeat组件;

iis的日志格式解析:

日志示例:

#Software: Microsoft Internet Information Services 7.5
#Version: 1.0
#Date: 2019-03-14 00:00:00
#Fields: date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) sc-status sc-substatus sc-win32-status time-taken
2019-03-14 00:00:00 10.78.1.205 POST /EWS/Exchange.asmx ;RC:eb8ae63b-0fa6-4335-8595-78e3237566cd;Init>>Conn:0,HangingConn:0,AD:30000/30000/0%,CAS:54000/54000/0%,AB:30000/30000/0%,RPC:36000/36000/0%,FC:1000/0,Policy:DefaultThrottlingPolicy_ad9fd210-3d2c-4f73-968c-d5ca7a72dfd0,Norm,Sub:5000/0;SoapAction=GetUserOofSettingsResponse;MailboxRPCRequests=9;MailboxRPCLatency=0;ADRequests=2;ADLatency=0;TimeInGetUserOOFSettings=37;[C]Queues:0msec/Execute:31.2msec;End(46.8ms)>>Conn:1,HangingConn:0,AD:30000/30000/0%,CAS:54000/53954/1%,AB:30000/30000/0%,RPC:36000/36000/0%,FC:1000/0,Policy:DefaultThrottlingPolicy_ad9fd210-3d2c-4f73-968c-d5ca7a72dfd0,Norm[Resources:(Mdb)SADV(Health:-1%,HistLoad:0),],Sub:5000/0; 443 STAPLESCN\SH102247 10.78.9.29 Microsoft+Office/16.0+(Windows+NT+6.1;+Microsoft+Outlook+16.0.4738;+Pro) 200 0 0 124
2019-03-14 00:00:02 10.78.1.205 POST /ews/exchange.asmx - 443 - 10.78.1.208 MS-WebServices/1.0 401 0 0 31

对上面的日志进行解析:

%{TIMESTAMP_ISO8601:log_timestamp} (%{IPORHOST:s_ip}|-) (%{WORD:cs_method}|-) %{NOTSPACE:cs_uri_stem} (%{NOTSPACE:cs_uri_query}|-) (%{NUMBER:s_port}|-) %{NOTSPACE:cs_username} (%{IPORHOST:c_ip}|-) %{NOTSPACE:cs_useragent} (%{NUMBER:sc_status}|-) (%{NUMBER:sc_substatus}|-) (%{NUMBER:sc_win32_status}|-) (%{NUMBER:time_taken}|-)

filebeat在windows的配置内容,输出到kafka中:

filebeat.inputs:
- type: log
  paths:
    - C:\inetpub\logs\LogFiles\W3SVC1\*
  #ignore_older: 2h
  exclude_lines: ['^#']
  tags: ["iis-206"]
  fields:
    type: "iis"
    log_topic: "iis"
  fields_under_root: true

output.kafka:
  enabled: true
  hosts: ["10.78.1.85:9092","10.78.1.87:9092","10.78.1.71:9092"]
  topic: "%{[log_topic]}"
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000


logstash从kafka读取日志并进行解析:

input {
    kafka {
        bootstrap_servers => "10.78.1.85:9092,10.78.1.87:9092,10.78.1.71:9092"
        topics => ["iis"]
        codec => "json"
    }
}

filter {
    grok {
        match => ["message","%{TIMESTAMP_ISO8601:log_timestamp} (%{IPORHOST:s_ip}|-) (%{WORD:cs_method}|-) %{NOTSPACE:cs_uri_stem} (%{NOTSPACE:cs_uri_query}|-) (%{NUMBER:s_port}|-) %{NOTSPACE:cs_username} (%{IPORHOST:c_ip}|-) %{NOTSPACE:cs_useragent} (%{NUMBER:sc_status}|-) (%{NUMBER:sc_substatus}|-) (%{NUMBER:sc_win32_status}|-) (%{NUMBER:time_taken}|-)"]
    }
    date {
            match => ["log_timestamp","yyyy-MM-dd HH:mm:ss"]
            #timezone => "Asia/Shanghai"
            timezone => "+00:00"
            target => "@timestamp"
    }
    mutate {
        remove_field => ["@version","beat","message","log_timestamp"]
    }
}
output {
    if [type] == "iis" {
        #if [tags][0] == "iis" {
            elasticsearch {
                hosts  => ["10.10.5.78:9200","10.10.5.79:9200","10.10.5.80:9200"]
                index  => "iis-%{+YYYY.MM.dd}"
            }
            #stdout { codec=> rubydebug }
        #}
    }
}

关于时区问题:

IIS默认的时间格式记录的时间比系统时间晚8个小时,让IIS记录正确的时间比较麻烦,建议使用Logstash在解析时直接增加时间;

在filter段的data中增加:timezone => "+00:00"





上一篇:关于 base64 编码


下一篇:点分治