生产消费消息
更新时间:2023-11-06 09:26:47
本小节主要介绍如何在命令行模式下通过 Kafka 生产消费消息。
发送消息
在客户端节点执行以下命令,向 Topic 发送消息。
cd /opt/kafka/current/bin
./kafka-console-producer.sh --broker-list {连接地址} --topic {Topic 名称}
-
连接地址:所连接的 Kafka 集群的地址,格式为 host_ip1:port,host_ip2:port,host_ip3:port。
host_ip 为 Kafka 节点的 IP 地址,port 为客户端节点的访问端口 9092。
若 Kafka 节点地址为:192.168.0.1, 192.168.0.2, 192.168.0.3, 则连接地址为:192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 -
Topic 名称:创建的 Topic 名称。
输入需要发送的消息内容,按 Enter 发送消息,每一行的内容都将作为一条消息发送到 Kafka。
示例
$ cd /opt/kafka/current/bin
./kafka-console-producer.sh --broker-list 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test
>hi
>hello world
>how are you
消费消息
在客户端执行以下命令,消费 Topic 消息。
cd /opt/kafka/current/bin
./kafka-console-consumer.sh --bootstrap-server {连接地址} --topic {Topic 名称} --from-beginning
-
连接地址:与发送消息的连接地址相同。
-
Topic 名称:与发送消息的 Topic 名称相同。
-
from-beginning 表示从最新的开始消费。
示例
$ cd /opt/kafka/current/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test --from-beginning
hi
hello world
how are you
查看 Topic 消息分布情况
查看 test 消息分布情况
$ cd /opt/kafka/current/bin
./kafka-topics.sh --describe --zookeeper 192.168.0.6:2181,192.168.0.8:2181,192.168.0.7:2181/kafka/cl-zom1un35 --topic test
Topic:test PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: -1 Replicas: 1 Isr: 1
Topic: test Partition: 1 Leader: -1 Replicas: 2 Isr: 2
Topic: test Partition: 2 Leader: 3 Replicas: 3 Isr: 3
查看消费者消费情况
kafka 0.9.0.0 以后的版本使用 kafka-consumer-groups.sh 查看消费者消费情况。
$ cd /opt/kafka/current/bin
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.3:9092,192.168.0.4:9092,192.168.0.9:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 10 10 0 consumer-1-0000f0c2-eee7-432b-833b-c882334c8f71 /192.168.100.26 consumer-1
test 1 7 7 0 consumer-1-0000f0c2-eee7-432b-833b-c882334c8f71 /192.168.100.26 consumer-1
kafka 0.9.0.0 以前的版本使用 kafka-consumer-offset-checker.sh 查看消费者消费情况。
$ cd /opt/kafka/current/bin
./kafka-consumer-offset-checker.sh --zookeeper 192.168.0.6:2181,192.168.0.8:2181,192.168.0.7:2181/kafka/cl-zom1un35 --topic test --group my-group