ELK之filebeat多个文件日志收集

ELK之filebeat多个文件日志收集


filebeat日志收集实战:

  • 架构规划:
    • 在下面的图当中从左向右看,当要访问ELK日志统计平台的时候,首先访问的是两台nginx+keepalived做的负载高可用,访问的地址是keepalived的IP,当一台nginx代理服务器挂掉之后也不影响访问,然后nginx将请求转发到kibana,kibana再去elasticsearch获取数据,elasticsearch是两台做的集群,数据会随机保存在任意一台elasticsearch服务器,redis服务器做数据的临时保存,避免web服务器日志量过大的时候造成的数据收集与保存不一致导致的日志丢失,可以临时保存到redis,redis可以是集群,然后再由logstash服务器在非高峰时期从redis持续的取出即可,另外有一台mysql数据库服务器,用于持久化保存特定的数据,web服务器的日志由filebeat收集之后发送给另外的一台logstash,再有其写入到redis即可完成日志的收集,从图中可以看出,redis服务器处于前端结合的最中间,其左右都要依赖于redis的正常运行,web服务删个日志经过filebeat收集之后通过日志转发层的logstash写入到redis不同的key当中,然后提取层logstash再从redis将数据提取并安按照不同的类型写入到elasticsearch的不同index当中,用户最终通过nginx代理的kibana查看到收集到的日志的具体内容:

1:filebeat收集日志转发至logstash(基于beats模块)

官方文档:https://www.elastic.co/guide/en/beats/filebeat/current/logstash-output.html

目前只收集了系统日志,下面将tomcat的访问日志和启动时生成的catalina.txt文件的日志进行收集,另外测试多行匹配,并将输出改为logstash进根据日志类型判断写入到不同的redis key当中,在一个filebeat服务上面同时收集不同类型的日志,比如收集系统日志的时候还要收集tomcat的访问日志,那么直接带来的问题就是要在写入至redis的时候要根据不同的日志类型写入到reids不通的key当中,首先通过logstash监听一个端口,并做标准输出测试,具体配置为:

配置Logstash配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
~]# cd /etc/logstash/conf.d/
conf.d]# cat beats-test.conf
input {
beats {
port => 5044
codec => "json" #指定编码格式
}
}

#将输出改为文件进行临时输出测试
output {
file {
path => "/tmp/filebeat.txt"
}
}

logstash配置文件测试语法

1
2
conf.d]# /usr/share/logstash/bin/logstash -f  /etc/logstash/conf.d/beats-test.conf  -t
conf.d]# systemctl restart logstash #重启服务

2:更改web服务器的filebeat配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#配置filebeat配置文件,配置收集nginx的访问日志,标准输出至Logstash中

~]# vim /etc/filebeat/filebeat.yml
12 filebeat.prospectors:

18 - input_type: log
21 paths:
22 - /var/log/nginx/access.log

39 fields:
40 type: nginx-accesslog

116 output.logstash:
117 hosts: ["172.18.135.2:5044"] #logstash 服务器地址,可以是多个
118 enabled: true #是否开启输出至logstash,默认即为true
119 worker: 1 #工作线程数
120 #compression_level: 3 #压缩级别,数值越大,压缩比越高(1~9),一般情况下不使用,因为会占用cpu资源
121 #loadbalance: true #多个输出的时候开启负载,在输出到多个logstash时开启此项

重启filebeat

1
~]# systemctl  restart filebeat

3:客户端模拟访问web服务器,logstash查看是记录日志文件

4:Logstash中配置将filebeat发送来的日志发送到redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
前面配置logstash是将filebeat发来的日志信息,存储到本地的文件中

编辑logstahs配置文件

~]# cd /etc/logstash/conf.d/
conf.d]# cat beats-test.conf
input {
beats {
port => 5044
codec => "json" #指定编码格式
}
}

#将输出改为文件进行临时输出测试
output {
if [fields] [type] == "nginx-accesslog" {
redis {
host => "172.18.135.2" #redis服务器的地址
prot => "6379"
password => "123456"
data_type => "list"
key => "nginx-accesslog" #日志数据存储到redis时的key的名称
db => 1
codec => "json"
}
}
}

logstash配置文件测试语法

1
2
conf.d]# /usr/share/logstash/bin/logstash -f  /etc/logstash/conf.d/beats-test.conf  -t
conf.d]# systemctl restart logstash #重启服务

5:在redis上查看是否已经存入1库中

6:配置logstash从redis中读取键值并存储到ES集群中做日志收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
~]# vim /etc/logstash/conf.d/redis-systemlog-es.conf 

input {
redis {
host => "172.18.135.2" #redis主机地址
password => "123456"
port => "6379"
db => "1"
key => "nginx-accesslog"
data_type => "list"
codec => "json"
}
}


output {
if [fields][type] == "nginx-accesslog" {
elasticsearch {
hosts => ["172.18.135.1:9200"] #ES集群主机地址
index => "nginx-log-135-1-%{+YYYY.MM.dd}"
codec => "json"
}}
}

语法检测并启动

1
2
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis-systemlog-es.conf -t
~]# systemctl restart logstash

7:ES主机上head插件查看日志索引是否存储

8:kibana根据定义的索引展示日志记录


filebeat收集多类型的日志文件

1:编辑用来收集日志的filebeat配置文件,收集本机的多个日子文件(收集本机上的nginx的访问日志和本机的所有的文件),发送到Logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
~]# grep -v "#" /etc/filebeat/filebeat.yml | grep -v "^$"
filebeat.prospectors:
- input_type: log
paths:
- /var/log/nginx/access.log
fields:
type: nginx-accesslog #标识nginx访问日志的类型(需要将nginx的日志类型改为json)
- input_type: log
paths:
- /var/log/message
fields:
type: systemlog #标识message日志的类型
output.logstash:
hosts: ["172.18.135.1:5044"]
enable: true
worker: 1
compression_level: 5


2:编辑logstash配置文件,将filebeat发送来的日志信息,根据定义的type类型存储到redis中
```bash
~]# cd /etc/logstash/conf.d/
conf.d]# cat beats-test.conf
input {
beats {
port => 5044
codec => "json" #指定编码格式
}
}

#将输出改为文件进行临时输出测试
output {
if [fields] [type] == "nginx-accesslog" {
redis {
host => "172.18.135.2" #redis服务器的地址
prot => "6379"
password => "123456"
data_type => "list"
key => "nginx-accesslog" #日志数据存储到redis时的key的名称
db => 1
codec => "json"
}}
if [fields] [type] == "systemlog" {
redis {
host => "172.18.135.2" #redis服务器的地址
prot => "6379"
password => "123456"
data_type => "list"
key => "systemlog" #日志数据存储到redis时的key的名称
db => 1
}}
}

语法检测并启动

1
2
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis-systemlog-es.conf -t
~]# systemctl restart logstash

3:在redis上查看是否已经存入1库中

4:编辑另一台logstash主机配置文件,从redis中服务日志键值,存储到后端的ES存储集群中,进行日志存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
~]# vim /etc/logstash/conf.d/redis-systemlog-es.conf 

input {
redis {
host => "172.18.135.2" #redis主机地址
password => "123456"
port => "6379"
db => "1"
key => "nginx-accesslog"
data_type => "list"
codec => "json"
}
redis {
host => "172.18.135.2" #redis主机地址
password => "123456"
port => "6379"
db => "1"
key => "systemlog"
data_type => "list"
#codec => "json"
}
}


output {
if [fields][type] == "nginx-accesslog" {
elasticsearch {
hosts => ["172.18.135.1:9200"] #ES集群主机地址
index => "nginx-log-135-1-%{+YYYY.MM.dd}"
codec => "json"
}}
if [fields][type] == "systemlog" {
elasticsearch {
hosts => ["172.18.135.1:9200"] #ES集群主机地址
index => "message-log-135-1-%{+YYYY.MM.dd}"
codec => "json"
}}
}

语法检测并启动

1
2
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis-systemlog-es.conf -t
~]# systemctl restart logstash

5:ES主机上head插件查看日志索引是否存储

6:kibana根据定义的索引展示日志记录


脚本编写进行监控

  • 依据判断logstash存储到redis数据的数据长度来判断存储redis数据的logstash程序是否已经宕机
  • 实际环境当中,可能会出现reids当中堆积了大量的数据而logstash由于种种原因未能及时提取日志,此时会导致redis服务器的内存被大量使用,甚至出现如下内存即将被使用完毕的情景

1:查看reids中的日志队列长度发现有大量的日志堆积在redis 当中

2:在写入redis日志信息的logstash主机上编写python脚本(编写python脚本,获取redis中列表长度并返回值)

1
2
3
4
5
6
7
8
9
10
11
~] # cat redis-llen.py
#!/usr/bin/env python
#coding:utf-
#Author Dai zhe
import redis
def redis_conn():
pool=redis.ConnectionPool(host="Redis主机地址",port=6379,db=1,password=123456)
conn = redis.Redis(connection_pool=pool)
data = conn.llen('存储在redis的KEY名称')
print(data)
redis_conn()

3:安装管理模块

1
2
~]# yum install python-pip
~]# pip install redis

4:执行脚本

1
2
~]# python redis-llen.py
这里显示redia KEY对应的值的大小


脚本编写进行ES存储定期清理

在ES主机上

  • 关于elasticsearch存储index的定期删除,elasticsearch存储的日志较多的时候会影响搜索性能,因此建议定期清理,脚本如下,这次是shell脚本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[root@els-s2 ~]# vim els_delete.sh
#!/bin/bash
DATE=`date -d "2 days ago" +%Y.%m.%d`
LOG_NAME="api4-systemlog-7-103" #索引名称,定义的索引尽量使用含有日期格式的索引
FILE_NAME=${LOG_NAME}-${DATE}
curl -XDELETE http://ES主机地址:9200/${FILE_NAME}
echo "${FILE_NAME} delete success"

[root@els-s2 ~]# chmod a+x /root/els_delete.sh


测试执行:

[root@els-s2 ~]# bash -x /root/els_delete.sh
++ date -d '30 days ago' +%Y.%m.%d
+ DATE=2016.08.05
+ LOG_NAME=api4-systemlog
+ FILE_NAME=api4-systemlog-2016.08.05
+ curl -XDELETE http://hfelk.chinacloudapp.cn:9200/api4-systemlog-2016.08.05
{"acknowledged":true}+ echo 'api4-systemlog-2016.08.05 delete success'
api4-systemlog-2016.08.05 delete success
6.2:添加任务计定期执行即可:

[root@els-s2 ~]# crontab -e
1 * * * * /root/els_delete.sh #每天凌晨一点清除一月之前的日志

备注:

  • 千万不要直接去ES主机上删除数据
    • /data/esdata/
-------------------码字不易尊重原创转载标注不胜感激-------------------
Yes or no?
0%