由于Kafka从3.0版本开始引入KRaft(Kafka Raft)协议,使其能够在没有 Zookeeper 的情况下运行
一、环境准备
安装JAVA环境
sudo apt install openjdk-8-jdk
下载Kafka离线包
# https://downloads.apache.org/kafka/
tar -zvxf kafka_2.12-3.5.2.tgz
修改~/.bashrc文件,在文件尾部增加Kafka环境变量
#add kafka env
export KAFKA_HOME=/home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0
export PATH=/home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0/bin:$PATH
更新环境变量
source ~/.bashrc
修改日志文件夹,默认在temp目录中会被清空,/config/kraft/server.properties
文件中log.dirs指向目录
# A comma separated list of directories under which to store log files
log.dirs=/home/zhuoyuhan/software/kafka_server/kafka-logs
二、运行Kafka
在kafka根目录执行,创建kafka集群ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID
将查询出的集群ID设置到/config/kraft/server.properties
文件中,可以放置在日志目录下方
cluster.id='MeKxKVflQjWTg7JrDakwEw'
修改IP远程连接
# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://192.168.31.122:9092,CONTROLLER://192.168.31.122:9093
在kafka根目录执行,进程方式启动kafka,并非后台运行
# bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
bin/kafka-server-start.sh config/kraft/server.properties
查询Kafka集群中的主题,如果没有主题则返回空
kafka-topics.sh --list --bootstrap-server 192.168.31.122:9092
创建主题
# --create 创建
# --topic zyh-test-topic 主题名称
# --bootstrap-server 192.168.31.122:9092 Kafka集群的连接地址
kafka-topics.sh --create --topic zyh-test-topic --bootstrap-server 192.168.31.122:9092
查看主题信息
kafka-topics.sh --describe --topic zyh-test-topic --bootstrap-server 192.168.31.122:9092
删除主题
kafka-topics.sh --delete --topic zyh-test-topic --bootstrap-server 192.168.31.122:9092
创建生产者
kafka-console-producer.sh --bootstrap-server 192.168.31.122:9092 --topic zyh-test-topic
创建消费者
kafka-console-consumer.sh --bootstrap-server 192.168.31.122:9092 --topic zyh-test-topic --from-beginning
三、注册系统服务
创建Systemd服务文件,sudo gedit /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0/bin/kafka-server-start.sh /home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0/config/kraft/server.properties
ExecStop=/home/zhuoyuhan/software/kafka_server/kafka_2.12-3.9.0/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
重新加载Systemd配置
sudo systemctl daemon-reload
开启自启动
sudo systemctl enable kafka.service
启动、查看、停止、重启Kafka服务
sudo systemctl start kafka.service
sudo systemctl status kafka.service
sudo systemctl stop kafka.service
sudo systemctl restart kafka.service