zookeeper和kafka

zookeeper和kafka


kafka及zookeeper简介:

  • Kafka (默认port:9092)被称为下一代分布式消息系统,是非营利性组织ASF(Apache Software Foundation,简称为ASF)基金会中的一个开源项目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat等开源软件都属于Apache基金会的开源软件,类似的消息系统还有RbbitMQ、ActiveMQ、ZeroMQ,最主要的优势是其具备分布式功能、并且结合zookeeper可以实现动态扩容。
  • 官方站点:http://www.infoq.com/cn/articles/apache-kafka

  • ZooKeeper(默认port:2181)是一个分布式且开源的分布式应用程序协调服务。
    • zookeeper集群特性:整个集群中只要有超过集群数量一半的zookeeper工作是正常的,那么整个集群对外就是可用的,假如有2台服务器做了一个zookeeper集群,只要有任何一台故障或宕机,那么这个zookeeper集群就不可用了,因为剩下的一台没有超过集群一半的数量,但是假如有三台zookeeper组成一个集群,那么损坏一台就还剩两台,大于3台的一半,所以损坏一台还是可以正常运行的,但是再损坏一台就只剩一台集群就不可用了。那么要是4台组成一个zookeeper集群,损坏一台集群肯定是正常的,那么损坏两台就还剩两台,那么2台不大于集群数量的一半,所以3台的zookeeper集群和4台的zookeeper集群损坏两台的结果都是集群不可用,一次类推5台和6台以及7台和8台都是同理,所以这也就是为什么集群一般都是奇数的原因。

zookeeper集群部署

安装zookeeper:

三台服务器: 备注:也可以将ZK和KF集群安装在6台主机上各三台
IP分别是:172.18.135.1    172.18.135.2    172.18.135.5

1:三台服务器分别配置hosts文件:

1
2
3
4
5
6
cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
172.18.135.1 es1.com
172.18.135.2 es2.com
172.18.135.5 es5.com

2:zookeeper的安装是依赖java环境的,三台主机安装java,JDK

1
2
3
4
~]# java -version
openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)

3:下载安装并验证zookeeper:
kafka下载地址:
http://kafka.apache.org/downloads.html

zookeeper 下载地址:
http://zookeeper.apache.org/releases.html

4:三台主机安装zookeeper(需要依赖java环境):

  • ZooKeeper是一个分布式且开源的分布式应用程序协调服务。
    • zookeeper集群特性:整个集群中只要有超过集群数量一半的zookeeper工作是正常的,那么整个集群对外就是可用的,假如有2台服务器做了一个zookeeper集群,只要有任何一台故障或宕机,那么这个zookeeper集群就不可用了,因为剩下的一台没有超过集群一半的数量,但是假如有三台zookeeper组成一个集群,那么损坏一台就还剩两台,大于3台的一半,所以损坏一台还是可以正常运行的,但是再损坏一台就只剩一台集群就不可用了。那么要是4台组成一个zookeeper集群,损坏一台集群肯定是正常的,那么损坏两台就还剩两台,那么2台不大于集群数量的一半,所以3台的zookeeper集群和4台的zookeeper集群损坏两台的结果都是集群不可用,一次类推5台和6台以及7台和8台都是同理,所以这也就是为什么集群一般都是奇数的原因。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
三台主机都要编译安装zookeeper(配置相同,配置如下)

~]# cd /usr/local/src/
src]# ls
zookeeper-3.4.13.tar.gz

src]# tar xvf zookeeper-3.4.13.tar.gz

创建软连接方便下次升级

src]# ln -sv /usr/local/src/zookeeper-3.4.13 /usr/local/zookeeper


三台主机配置zookeeper配置的文件

~]# cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg

~]# grep "^[a-Z]" /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
maxClientCnxns=4096
autopurge.snapRetainCount=3
autopurge.purgeInterval=72
server.1=172.18.135.1:2888:3888
server.2=172.18.135.2:2888:3888
server.3=172.18.135.5:2888:3888

三台主机创建zookeeper数据目录

~]# mkdir /usr/local/zookeeper/data

在三台主机上手动生成serverID

172.18.135.1
~]# echo "1" > /usr/local/zookeeper/data/myid

172。18.135.2
~]# echo "2" > /usr/local/zookeeper/data/myid

172.18.135.5
~]# echo "3" > /usr/local/zookeeper/data/myid


三台机器启动zookeeper

~]# ls /usr/local/zookeeper/bin/
zkServer.sh #启动服务端脚本
zkCli.sh #启动客户端脚本

~]# /usr/local/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

查看各主机的端口是否监听
#启动zookeeper进程时,已经将角色身份分配好,如果为领导者端口为3888,2181,2888,如果身份为跟随者端口为2181,2888

~]# ss -tnl
::ffff:172.18.135.2:3888
:::2181
::ffff:172.18.135.2:2888

zookeeper配置文件详解

1
2
3
4
5
6
7
8
9
10
11
src]# grep "^[a-Z]" /usr/local/zookeeper/conf/zoo.cfg 
tickTime=2000 #服务器与服务器之间和客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
initLimit=5 #集群中leader服务器与follower服务器初始连接心跳次数,即多少个2000毫秒
syncLimit=5 # leader与follower之间连接完成之后,后期检测发送和应答的心跳次数,如果该follower 在设置的时间内(5*2000)不能与leader 进行通信,那么此 follower 将被视为不可用。
clientPort=2181 #客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
dataDir=/usr/local/zookeeper/data #自定义的zookeeper保存数据的目录
autopurge.snapRetainCount=3 #设置zookeeper保存保留多少次客户端连接的数据
autopurge.purgeInterval=1 #设置zookeeper间隔多少小时清理一次保存的客户端数据(单位为小时,0为关闭自动清理)
server.1=192.168.15.211:2888:3888 #服务器编号=服务器IP:LF数据同步端口:LF选举端口
server.2=192.168.15.212:2888:3888
server.3=192.168.15.213:2888:3888

5:查看各主机的zookeeper节点状态信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java程序往leader中写入数据,从follower中读取数据,此功能是基于底层的java库实现的

~]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader #角色:领导者/追随者

~]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

~]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

zookeeper简单操作测试

6:连接到leader节点生成数据

1
2
3
4
5
6
7
8
客户端链接至领导者节点

~]# /usr/local/zookeeper/bin/zkCli.sh -server 172.18.135.2:2181 #指定server端的节点地址和端口

在/下创建消息

[zk: 172.18.135.2:2181(CONNECTED) 0] create /test "hello"
Created /test

7:链接到follower节点查看消息是否同步leader节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
客户端连接至跟随者节点查看消息

~]# /usr/local/zookeeper/bin/zkCli.sh -server 172.18.135.1:2181
[zk: 172.18.135.1:2181(CONNECTED) 0] get /test
hello
cZxid = 0x100000004
ctime = Sat Feb 23 19:11:52 CST 2019
mZxid = 0x100000004
mtime = Sat Feb 23 19:11:52 CST 2019
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0


kafka 集群部署

  • kafka:(端口9092)
    • Broker
      • Kafka集群包含一个或多个服务器,这种服务器被称为broker
    • Topic
      • 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
    • Partition
      • parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
    • Producer
      • 生产消息,负责发布消息到Kafka broker
    • Consumer
      • 消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

1:在上面安装zookeeper的三台主机上安装kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
安装解压

~]# cd /usr/local/src/
src]# ls
kafka_2.12-2.1.0.tgz

src]# tar xvf kafka_2.12-2.1.0.tgz

src]# ln -sv /usr/local/src/kafka_2.12-2.1.0 /usr/local/kafka
‘/usr/local/kafka’ -> ‘/usr/local/src/kafka_2.12-2.1.0’

编辑各配置文件

172.18.135.1

~]# vim /usr/local/kafka/config/server.properties
21 broker.id=1 #设置每个代理全局唯一的整数ID
31 listeners=PLAINTEXT://172.18.135.1:9092 #指定kafka监听的本机地址
60 log.dirs=/usr/local/kafka/logs #指定kafka保存日志的位置
65 num.partitions=1 #kafka分区,一个数据仅保留一个分区
103 log.retention.hours=72 #保留指定小时的日志内容
123 zookeeper.connect=172.18.135.1:2181,172.18.135.2:2181,172.18.135.5:2181 #所有的zookeeper地址,让zookeeper注册在kafka中

创建日志存放的目录
~]# mkdir -p /usr/local/kafka/logs

172.18.135.2

~]# vim /usr/local/kafka/config/server.properties
21 broker.id=2
31 listeners=PLAINTEXT://172.18.135.2:9092
60 log.dirs=/usr/local/kafka/logs
65 num.partitions=1
103 log.retention.hours=72
123 zookeeper.connect=172.18.135.1:2181,172.18.135.2:2181,172.18.135.5:2181
创建日志存放的目录
~]# mkdir -p /usr/local/kafka/logs

172.18.135.5

~]# vim /usr/local/kafka/config/server.properties
21 broker.id=3
31 listeners=PLAINTEXT://172.18.135.5:9092
60 log.dirs=/usr/local/kafka/logs
65 num.partitions=1
103 log.retention.hours=72
123 zookeeper.connect=172.18.135.1:2181,172.18.135.2:2181,172.18.135.5:2181
创建日志存放的目录
~]# mkdir -p /usr/local/kafka/logs

2:各主机启动kafka

1
2
3
4
~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties  #以守护进程的方式启动

~]# ss -tnl
::ffff:172.18.135.2:9092 #kafka监听端口

将zookeeper和kafka加入开机启动项中

将各主机的kafka和zookeeper程序设置为开机自启

1
2
3
4
5
6
~]# vim /etc/rc.d/rc.local 
/usr/local/zookeeper/bin/zkServer.sh start
sleep 20
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

~]# chmod +x /etc/rc.d/rc.local

zookeeper命令测试

1:验证kafka进程

1
2
3
4
5
~]# jps
26450 Jps
1079 Elasticsearch
10905 QuorumPeerMain
19790 Kafka

2:测试zookeeper创建topic:创建名为logstashtest,partitions(分区)为3,replication(复制)为3的topic(主题):
在任意kafaka服务器操作:

1
2
~]# /usr/local/kafka/bin/kafka-topics.sh --create  --zookeeper 172.18.135.1:2181,172.18.135.2:2181,172.18.135.5:2181 --partitions 3 --replication-factor 3 --topic logstashtest
Created topic "logstashtest".

3:测试zookeeper获取topic
可以在任意一台kafka服务器进行测试:

1
2
3
4
5
~]# /usr/local/kafka/bin/kafka-topics.sh  --describe --zookeeper 172.18.135.1:2181,172.18.135.2:2181,172.18.135.5:2181  --topic logstashtest
Topic:logstashtest PartitionCount:3 ReplicationFactor:3 Configs:
Topic: logstashtest Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,2,3
Topic: logstashtest Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: logstashtest Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1

  • 状态说明:logstashtest有三个分区分别为0、1、2,分区0的leader是3(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。

4:获取所有topic

1
2
~]# /usr/local/kafka/bin/kafka-topics.sh  --list --zookeeper 172.18.135.1:2181,172.18.135.2:2181,172.18.135.5:2181
logstashtest

5:删除topic

1
~]#/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 172.18.135.1:2181,172.18.135.2:2181,172.18.135.5:2181  --topic logstashtest

kafka命令测试消息发送

1:创建topic

1
2
~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.18.135.1:2181,172.18.135.2:2181,172.18.135.5:2181 --partitions 3 --replication-factor 3 --topic  messagetest
Created topic "messagetest".

2:发送消息

1
~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list  172.18.135.1:9092,172.18.135.2:9092,172.18.135.3:9092 --topic  messagetest


使用logstash测试向kafka写入数据

测试使用logstash测试向kafka写入数据

logstash收集日志标准输出至kafka参考官方文档:https://www.elastic.co/guide/en/logstash/5.6/plugins-outputs-kafka.html

1:编辑logstash配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
~]# vim /etc/logstash/conf.d/logstash-kafka.sh

input { #标准输入为收集桌面信息
stdin {}
}
output { #logstash收集日志写入kafka中
kafka {
codec => json
topic_id => "messagetest" #指定写入的topic,如果指定的topic在kafka集群中不存在,则kafka自动创建此topic
bootstrap_servers => "172.18.135.1:9092" #指定集群中任意kafka的地址
batch_size => 5
}
stdout { #logstash收集的日志再在终端中打印一份
codec => rubydebug
}
}

2:检查logstash配置文件的语法格式并启动

1
2
3
4
5
6
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-kafka.sh -t
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
Configuration OK

~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-kafka.sh

3:验证kafka收到logstash数据

3.1:logstash启动,标准输入为桌面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-kafka.sh
[INFO ] 2019-03-02 18:44:19.332 [[main]-pipeline-manager] AppInfoParser - Kafka version : 0.10.0.1
[INFO ] 2019-03-02 18:44:19.333 [[main]-pipeline-manager] AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
The stdin plugin is now waiting for input:
haha
{
"@version" => "1",
"host" => "es3.com",
"@timestamp" => 2019-03-02T10:45:27.185Z,
"message" => "haha"
}

logstash
{
"@version" => "1",
"host" => "es3.com",
"@timestamp" => 2019-03-02T10:49:07.782Z,
"message" => "logstash"
}

3.2:kafka集群也监听的messagetest topic

1
2
3
4
~]# /usr/local/kafka/bin/kafka-console-consumer.sh --topic messagetest --bootstrap-server 172.18.135.1:9092 --from-beginning
haha
{"@version":"1","host":"es3.com","@timestamp":"2019-03-02T10:45:27.185Z","message":"haha"}
{"@version":"1","host":"es3.com","@timestamp":"2019-03-02T10:49:07.782Z","message":"logstash"}


后端logstash收集单个Nginx日志文件缓存到zk和KF集群,ZK和KF集群上的logstash再取走存储到ES中

1:编辑Logstash配置文件,收集nginx日志的logstash,再将收集的日志文件存储到ZK和KF集群中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
~]# vim  /etc/logstash/conf.d/nginx-kafka.conf 
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
type => "nginx-accesslog-1512"
codec => "json" #声明json编码格式
}
}


output {
if [type] == "nginx-accesslog-1512" {
kafka {
bootstrap_servers => "192.168.15.11:9092" #任意kafka集群中服务器地址
topic_id => "nginx-accesslog-1512" #日志保存在KF中的topic主题名称
codec => "json" #指定日志编码格式
}

file {
path => "/tmp/nginx-jsog-log.txt" #将日志保存在本地文件中一份
}
}
}

2:测试logstash配置文件语法格式,并启动logstash

1
2
3
4
5
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx-kafka.conf -t
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.sett
ings. Continuing using the defaultsCould not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
Configuration OK
~]# systemctl restart logstash

3:访问Nginx Web界面

1
~]# ab -n100 -c10 http://nginx地址/index.html

4:验证日志写入到/tmp 文件

1
~]# head /tmp/nginx-jsog-log.txt

5:配置logstash从kafka读取日志(logstash可以安装在ZK和KF集群中的任何一台主机上都可以)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
~]# vim  /etc/logstash/conf.d/nginx-kafka.conf 
input { #标准输入从KAFKA输入
kafka {
bootstrap_servers => "192.168.15.11:9092" #任意kafka集群中服务器地址
topics => "nginx-accesslog-1512" #logstash从KF中取走topic的主题名称
codec => "json"
consumer_threads => 1
#decorate_events => true
}
}


output { #输出至后端的ES中
if [type] == "nginx-accesslog-1512" {
elasticsearch {
hosts => ["ES主机地址:9200"]
index => "nginx-accesslog-kafka-1512-%{+YYYY.MM.dd}" #指定索引格式
}}
}

6:测试语法并重启logstash

1
2
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-es.conf -t
~]# systemctl restart logstash

7:在ES主机上head插件验证数据

8:kibana添加Nginx访问日志索引

9:kibana验证数据


使用logstash收集多日志文件并写入kafka:

1:配置logstash收集message日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  ~]# chmod  644 /var/log/messages
conf.d]# cat nginx-kafka.conf
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
type => "nginx-accesslog-1512"
codec => "json"
}
file {
path => "/var/log/messages"
start_position => "beginning"
type => "system-log-1512"
}
}

output {
if [type] == "nginx-accesslog-1512" {
kafka {
bootstrap_servers => "192.168.15.11:9092"
topic_id => "nginx-accesslog-1512"
batch_size => 5
codec => "json"
} }

if [type] == "system-log-1512" {
kafka {
bootstrap_servers => "192.168.15.11:9092"
topic_id => "system-log-1512"
batch_size => 5
codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
}
file {
path => "/tmp/systemlog-1512-log.txt"
}}
}

2:测试并重启logstash

1
2
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx-kafka.conf  -t
~]# systemctl restart logstash

3:验证logstash启动完成

4:验证日志写入到目标文件

5:配置logstash从kafka读取系统日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#此步骤如果没有从kafka正确收集日志或者将日志从kafka读取并写入到文件没有输出,可以使用/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-es.conf 进行标准输出测试。

~]# cat /etc/logstash/conf.d/kafka-es.conf
input {
kafka {
bootstrap_servers => "192.168.15.11:9092"
topics => "nginx-accesslog-1512"
codec => "json"
consumer_threads => 1
decorate_events => true
}

kafka {
bootstrap_servers => "192.168.15.11:9092"
topics => "system-log-1512"
consumer_threads => 1
decorate_events => true
codec => "json"
}
}


output {
if [type] == "nginx-accesslog-1512" {
elasticsearch {
hosts => ["192.168.15.11:9200"]
index => "nginx-accesslog-1512-%{+YYYY.MM.dd}"
}}

if [type] == "system-log-1512" {
elasticsearch {
hosts => ["192.168.15.12:9200"]
index => "system-log-1512-%{+YYYY.MM.dd}"
}
stdout {
codec => "rubydebug"
}
}
}

6:验证配置并启动logstash

1
2
~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-es.conf –t
~]# systemctl start logstash

7:在head插件验证数据

8:在kibana添加索引

9:kibana验证数据


补充:

  • 也可以将logstash开启两个进程去同时收集日志
    • /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-es.conf #systemcd 管理的进程去收集日志
    • /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-es.conf & # 手动管理的进程去收集日志
-------------------码字不易尊重原创转载标注不胜感激-------------------
Yes or no?
0%