Filebeat+Kafka+ELK

首先介绍一下什么是Filebeat+Kafka+ELK,ELK+Filebeat就是通过Filebeat收集数据,然后通过logstash发送到es上,然后直接发送的话,如果数据过大,或者遇到其他别的一些问题,在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。所以我们使用Kafka通过异步处理请求,从而缓解系统的压力。Kafka就是消息队列,把Filebeat收集的数据输出到Kafka,然后通过logstash把经过消息队列排序的消息有序的发往es储存,从而避免一个时间系统请求过多带来的系统崩盘。

一、环境准备

node1节点(内存4G):192.168.235.101 部署ES、Kibana、Filebeat
node2节点(内存4G):192.168.235.102 部署ES
apache节点:192.168.235.103 部署logstash、apache
zookeeper集群(部署zookeeper):
server1:192.168.235.104
server2:192.168.235.105
server3:192.168.235.106
关闭防火墙
以下实验建立在以上环境当中,已经全部完成,详情见之前博客。

二、安装kafka

在zookeeper集群中三台机子上均要安装
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
修改配置文件
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
修改环境变量
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
配置启动脚本
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK

#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
	echo "---------- Kafka 启动 ------------"
	${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
	echo "---------- Kafka 停止 ------------"
	${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
	$0 stop
	$0 start
;;
status)
	echo "---------- Kafka 状态 ------------"
	count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
	if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

Filebeat+Kafka+ELK
创建topic
Filebeat+Kafka+ELK

查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper 192.168.235.104:2181,192.168.235.105:2181,192.168.235.106:2181
查看某个 topic 的详情
kafka-topics.sh  --describe --zookeeper 192.168.235.104:2181,192.168.235.105:2181,192.168.235.106:2181
发布消息
kafka-console-producer.sh --broker-list 192.168.235.104:9092,192.168.235.105:9092,192.168.235.106:9092  --topic name1
消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.235.104:9092,192.168.235.105:9092,192.168.235.106:9092 --topic name1 --from-beginning
--from-beginning:会把主题中以往所有的数据都读取出来
修改分区数
kafka-topics.sh --zookeeper 192.168.235.104:2181,192.168.235.105:2181,192.168.235.106:2181 --alter --topic name1 --partitions 6
删除 topic
kafka-topics.sh --delete --zookeeper 192.168.235.104:2181,192.168.235.105:2181,192.168.235.106:2181 --topic name1

三、Filebeat+Kafka+ELK

修改Filebeat配置文件
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
启动 filebeat
Filebeat+Kafka+ELK
在logstash上新建一个配置文件
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK

input {
    kafka {
        bootstrap_servers => "192.168.235.104:9092,192.168.235.105:9092,192.168.235.106:9092"
        topics  => "filebeat_name1"
        group_id => "name123"
        auto_offset_reset => "earliest"
    }
}

output {
    elasticsearch {
        hosts => ["192.168.235.101:9200"]
        index => "filebeat-%{+YYYY.MM.dd}"
    }
    stdout {
        codec => rubydebug
    }
}

启动 logstash
Filebeat+Kafka+ELK
登录 Kibana
Filebeat+Kafka+ELK
Filebeat+Kafka+ELK
可以查看到node1节点的信息,以上的操作是使用filebeat收集了node1的数据,传送到zookeeper集群,zookeeper这边是通过kafka接受,kafka作为一个消息队列,他可以对数据进行处理,然后传送给通过logstash传送到es中进行储存,最后通过kibana展示出来。

上一篇:elk集群之grafana(8)


下一篇:第二十三章 Centos7下Docker安装kibana