Flume 推送日志到 Kafka

Posted by aclyyx on 04-23,2021

1.基础环境说明

  • 虚拟运行环境:VirtualBox 6.1
  • 系统:CentOS 7.5
  • Java:Zulu JDK 11.48+21-CA (build 11.0.11+9-LTS)
  • Kafka:2.12-2.8.0
  • Flume:1.9.0

2.搭建 Kafka测试环境

2.1 安装 Kafka

Kafka官网 下载tgz文件

tar -vxzf kafka_2.12-2.8.0.tgz
mv kafka_2.12-2.8.0/ kafka
cd kafka

2.2 启动 ZooKeeper 服务

使用默认配置启动

$ bin/zookeeper-server-start.sh config/zookeeper.properties

2.3 配置启动 Kafka

进入kafka的config配置文件目录,所有配置文件都存放在该目录下。

cd config

2.3.1 Kafka 主服务

2.3.1.1 配置使用 SASL_PLAINTEXT 认证

编辑server.properties文件,找到listeners = PLAINTEXT://your.host.name:9092并在下面做出修改。

listeners=SASL_PLAINTEXT://192.168.1.123:9092

advertised.listeners=SASL_PLAINTEXT://192.168.1.123:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

说明:

  1. 开启SASL认证 SASL_PLAINTEXT ;
  2. 监听IP192.168.1.123的9092端口(本机端口),注意:不可以用 0.0.0.0:9092 的方式。
2.3.1.2 配置服务授权信息

创建文件kafka_server_jaas.conf(文件名和路径随意,但需要与后面的配置和命令相对应),创建文件后填写如下内容:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-test"
    user_admin="admin-test"
    user_producer="prod-test"
    user_consumer="cons-test";
};

说明:

  1. 第一行KafkaServer说明该文件内容为Kafka服务信息;
  2. username 和 password 配置管理员信息;
  3. user_producer="prod-test"配置用户 producer 的密码为 prod-test;
  4. user_consumer="cons-test"配置用户 consumer 的密码为 cons-test。
2.3.1.3 启动 Kafka Server
$ bin/kafka-server-start.sh config/server.properties

2.3.2 配置消费者

2.3.2.1 配置使用 SASL_PLAINTEXT 认证

消费者用来观察 Flume 推送过来的信息,编辑consumer.properties文件,在文件末尾添加信息。

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
2.3.2.2 配置服务授权信息
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="consumer"
    password="cons-test";
};
2.3.2.3 修改 bin/kafka-console-consumer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/root/kafka/config/kafka_client_jaas.conf"
fi
2.3.2.4 启动消费者
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.123:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

2.3.3 配置启动生产者以测试Kafka服务

2.3.3.1 修改 bin/kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M  -Djava.security.auth.login.config=/root/kafka/config/kafka_client_jaas.conf"
fi
2.3.3.3 启动生产者
$ bin/kafka-console-producer.sh --broker-list 192.168.1.123:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

启动成功后,会出现>提示符;此时,在提示符后面输入字符并发送,就可以在消费者界面看到已经接收到信息,表示 Kafka 启动成功!

3.Flume

3.1 Flume 配置文件

$ vi flume/job/telnet-to-kafka.conf

agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type=exec
# 监听 /root/kafka.log 文件的变化并推送到 Kafka
agent.sources.s1.command=tail -F /root/kafka.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
#设置Kafka接收器
# agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
# agent.sinks.k1.brokerList=192.168.1.123:9092
#设置Kafka的Topic
# agent.sinks.k1.topic=test
#设置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#设置Kafka接收器
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
agent.sinks.k1.kafka.bootstrap.servers = 192.168.1.123:9092
agent.sinks.k1.kafka.topic = test
agent.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
agent.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
agent.sinks.k1.kafka.producer.sasl.kerberos.service.name = producer
agent.sinks.k1.channel = c1
agent.sources.s1.channels = c1

3.2 创建并编辑授权文件

$ vi flume/job/kafka_client_jaas.conf

KafkaClient {  
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="producer"  
        password="prod-test";  
};

3.3 启动 Flume

$ bin/flume-ng agent -n agent -c conf -f job/telnet-to-kafka.conf -Djava.security.auth.login.config=/root/flume/job/kafka_client_jaas.conf -Dflume.root.logger=INFO,console

3.4 测试

向被监听的文件中追加内容

$ echo test-logger-20210422-001>>/root/kafka.log

追加内容后查看 Kafka 的消费者界面,如果看到追加的文件内容,表示 Kafka 大功告成!

附录

监听文件夹上传指定日志文件

agent.sources.s1.type=spooldir
agent.sources.s1.spoolDir=/root/logs
# agent.sources.s1.includePattern=^.*\.log$
# 正则表达式限制指定文件
agent.sources.s1.includePattern=^jeecgboot-[0-9]{4}-[0-9]{2}-[0-9]{2}\.[0-9]*\.log$
agent.sources.s1.fileSuffix=.push

4. 参考

kafka 快速安装使用
kafka 官方指南
Centos7.3防火墙配置
Kafka的安全认证机制SASL/PLAINTEXT