正文
该配置通过syslog source配置localhost tcp 5140端口来接收网络设备发送的Syslog信息,event缓存在内存中,再通过KafkaSink将日志发送到kafka集群中名为“syslog-kafka”的topic中。
Logstash来自Elastic公司,专为收集、分析和传输各类日志、事件以及非结构化的数据所设计。它有三个主要功能:事件输入(Input)、事件过滤器(Filter)以及事件输出(Output),在后缀为.conf的配置文件中设置,本例中Syslog配置如下:
# syslog.conf
input {
Syslog {
port => "514"
}
}
filter {
}
output {
kafka {
bootstrap_servers => "192.168.8.65:9092"
topic_id => "syslog-kafka"
compression_type => "snappy"
codec => plain {
format => "%{host} %{@timestamp} %{message}"
}
}
}
Input(输入)插件用于指定各种数据源,本例中的Logstash通过udp 514端口接收Syslog信息;
Filter(过滤器)插件虽然在本例中不需要配置,但它的功能非常强大,可以进行复杂的逻辑处理,包括正则表达式处理、编解码、k/v切分以及各种数值、时间等数据处理,具体可根据实际场景设置;
Output(输出)插件用于将处理后的事件数据发送到指定目的地,指定了Kafka的位置、topic以及压缩类型。在最后的Codec编码插件中,指定来源主机的IP地址(host)、Logstash处理的时间戳(@timestamp)作为前缀并整合原始的事件消息(message),方便在事件传输过程中判断Syslog信息来源。单条原始Syslog信息流样例如下:
147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down
Logstash Output插件处理后的信息流变成为:
19.1.1.12 2016-10-13T10:04:54.520Z 12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down
其中红色字段就是codec编码插件植入的host以及timestamp信息。处理后的Syslog信息会发送至Kafka集群中进行消息的缓存。
Kafka日志缓冲
Kafka是一个高吞吐的分布式消息队列,也是一个订阅/发布系统。Kafka集群中每个节点都有一个被称为broker的实例,负责缓存数据。Kafka有两类客户端,Producer(消息生产者的)和Consumer(消息消费者)。Kafka中不同业务系统的消息可通过topic进行区分,每个消息都会被分区,用以分担消息读写负载,每个分区又可以有多个副本来防止数据丢失。消费者在具体消费某个topic消息时,指定起始偏移量。Kafka通过Zero-Copy、Exactly Once等技术语义保证了消息传输的实时、高效、可靠以及容错性。
Kafka集群中某个broker的配置文件server.properties的部分配置如下:
########## Server Basics ###########
# 为每一个broker设置独立的数字作为id
broker.id=1
###### Socket Server Settings ######
# socket监听端口
port=9092
########### Zookeeper ##############
# Zookeeper的连接配置
zookeeper.connect=gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=3000
其中需指定集群里不同broker的id,此台broker的id为1,默认监听9092端口,然后配置Zookeeper(后续简称zk)集群,再启动broker即可。
Kafka集群名为syslog-kafka的topic:
bin/kafka-topics.sh
--create
--zookeeper gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181
--replication-factor 3 --partitions 3
--topic syslog-kafka
Kafka集群的topic以及partition等信息也可以通过登录zk来观察。然后再通过下列命令查看Kafka接收到的所有交换机日志信息:
bin/kafka-console-consumer.sh--zookeeper gtcluster-slave02:2181--from-beginning
--topic Syslog-kafka
部分日志样例如下:
10.1.1.10 2016-10-18T05:23:04.015Z 5585: Oct 18 13:22:45: %LINK-3-UPDOWN: Interface FastEthernet0/9, changed state to down
19.1.1.113 2016-10-18T05:24:04.425Z 10857: Oct 18 13:25:23.019 cmt: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to down
19.1.1.113 2016-10-18T05:24:08.312Z 10860: Oct 18 13:25:27.935 cmt: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to up