由于Kafka从3.0版本开始引入KRaft(Kafka Raft)协议,使其能够在没有 Zookeeper 的情况下运行

一、环境准备

安装JAVA环境

sudo apt install openjdk-8-jdk

下载Kafka离线包

# https://downloads.apache.org/kafka/
tar -zvxf kafka_2.12-3.5.2.tgz

修改~/.bashrc文件,在文件尾部增加Kafka环境变量

#add kafka env
export KAFKA_HOME=/home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0
export PATH=/home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0/bin:$PATH

更新环境变量

source ~/.bashrc

修改日志文件夹,默认在temp目录中会被清空,/config/kraft/server.properties文件中log.dirs指向目录

# A comma separated list of directories under which to store log files
log.dirs=/home/zhuoyuhan/software/kafka_server/kafka-logs

二、运行Kafka

在kafka根目录执行,创建kafka集群ID

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID

将查询出的集群ID设置到/config/kraft/server.properties文件中,可以放置在日志目录下方

cluster.id='MeKxKVflQjWTg7JrDakwEw'

修改IP远程连接

# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://192.168.31.122:9092,CONTROLLER://192.168.31.122:9093

在kafka根目录执行,进程方式启动kafka,并非后台运行

# bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties 
bin/kafka-server-start.sh config/kraft/server.properties

查询Kafka集群中的主题,如果没有主题则返回空

kafka-topics.sh --list --bootstrap-server 192.168.31.122:9092

创建主题

# --create 创建
# --topic zyh-test-topic 主题名称
# --bootstrap-server 192.168.31.122:9092 Kafka集群的连接地址
kafka-topics.sh --create --topic zyh-test-topic --bootstrap-server 192.168.31.122:9092

查看主题信息

kafka-topics.sh --describe  --topic zyh-test-topic --bootstrap-server 192.168.31.122:9092

删除主题

kafka-topics.sh --delete  --topic zyh-test-topic --bootstrap-server 192.168.31.122:9092

创建生产者

kafka-console-producer.sh --bootstrap-server 192.168.31.122:9092 --topic zyh-test-topic

创建消费者

kafka-console-consumer.sh --bootstrap-server 192.168.31.122:9092 --topic zyh-test-topic --from-beginning

三、注册系统服务

创建Systemd服务文件,sudo gedit /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0/bin/kafka-server-start.sh /home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0/config/kraft/server.properties
ExecStop=/home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

重新加载Systemd配置

sudo systemctl daemon-reload

开启自启动

sudo systemctl enable kafka.service

启动、查看、停止、重启Kafka服务

sudo systemctl start kafka.service
sudo systemctl status kafka.service
sudo systemctl stop kafka.service
sudo systemctl restart kafka.service