独立的消费者案例(订阅主题)
1. 需求
- 在消费者 API 代码中必须配置消费者组 id 。 命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id 。
2. 实现步骤
'''
Description: henggao_learning
version: v1.0.0
Author: henggao
Date: 2022-02-28 14:56:27
LastEditors: henggao
LastEditTime: 2022-03-03 15:49:17
'''
# ==========读取指定位置消息===============
from pydoc_data.topics import topics
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
# 设置topic的名称
# topic_name = 'my_favorite_topic2'
# topic_name = 'test_topic'
topic_name = 'first'
consumer = KafkaConsumer(topic_name,bootstrap_servers=['192.168.92.145:9092', '192.168.92.146:9092'])
print(consumer.partitions_for_topic(topic_name)) #获取first主题的分区信息
print(consumer.topics()) #获取主题列表
print(consumer.subscription()) #获取当前消费者订阅的主题
print(consumer.assignment()) #获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=topic_name, partition=0), 5) #重置偏移量,从第5个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
3. 测试
- 消费者订阅一个主题
- 命令行发送数据
root@redis01:/usr/local/kafka# ./bin/kafka-console-producer.sh --bootstrap-server redis01:9092 --topic first
>Hello Java
>
独立的消费者案例(订阅分区)
1. 需求
2. 实现步骤
- 使用assgin订阅分区,
consumer.assign([TopicPartition('first', 0)])
'''
Description: henggao_note
version: v1.0.0
Author: henggao
Date: 2022-03-03 17:05:04
LastEditors: henggao
LastEditTime: 2022-03-03 19:44:25
Des:Conusmer指定分区
'''
from pydoc_data.topics import topics
from kafka import KafkaConsumer,TopicPartition
consumer = KafkaConsumer(
bootstrap_servers=[
'192.168.92.145:9092', '192.168.92.146:9092'], # kafka集群地址
group_id="my_group1", # 消费组id
enable_auto_commit=True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
auto_commit_interval_ms=5000, # 自动提交的周期(毫秒)
api_version=(0, 10)
)
# 指定Consumer的一个消费分区
consumer.assign([TopicPartition('first', 0)])
# 指定Consumer的多个消费分区
# consumer.assign([TopicPartition('first', 0),TopicPartition('first', 1)])
print(consumer.partitions_for_topic("first")) #获取my_favorite_topic2主题的分区信息
print(consumer.topics()) #获取主题列表
# print(consumer.subscription()) # 获取当前消费者订阅的主题
print(consumer.assignment()) #获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏
# for msg in consumer: # 迭代器,等待下一条消息
# print (msg) # 打印消息
3. 测试
'''
Description: henggao_learning
version: v1.0.0
Author: henggao
Date: 2022-02-28 10:11:09
LastEditors: henggao
LastEditTime: 2022-03-03 22:33:02
'''
from kafka import KafkaProducer
from time import sleep
# topic_name = 'kafka_demo'
topic_name = 'first'
def start_producer():
producer = KafkaProducer(bootstrap_servers=[
'192.168.92.145:9092', '192.168.92.146:9092'])
for i in range(0,5):
msg = 'msg_test is ' + str(i)
producer.send(topic_name, msg.encode('utf-8') ,partition=0)
sleep(3)
if __name__ == '__main__':
start_producer()
consumer.assign([TopicPartition('first', 0)])
订阅0分区,有数据
consumer.assign([TopicPartition('first', 1)])
订阅1分区,没有有数据
消费者组案例
1. 需求
- 需求:测试同一个 主题的分区 数据 只能 由 一个消费者 组中的一个 消费
2. 实现步骤
group_id="my_group1"
表示三个消费者在一个组my_group1
'''
Description: henggao_note
version: v1.0.0
Author: henggao
Date: 2022-03-03 23:18:41
LastEditors: henggao
LastEditTime: 2022-03-03 23:41:28
'''
from kafka import KafkaConsumer
import time
# topic_name = 'my_favorite_topic2'
topic_name = 'first'
def start_consumer():
consumer = KafkaConsumer(topic_name, bootstrap_servers=[
'192.168.92.145:9092', '192.168.92.146:9092','192.168.92.147:9092','192.168.92.148:9092'], group_id="my_group1", )
for msg in consumer:
print(msg)
print("topic = %s" % msg.topic) # topic default is string
print("partition = %d" % msg.partition)
print("value = %s" % msg.value.decode()) # bytes to string
print("time = ", time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(msg.timestamp/1000)))
if __name__ == '__main__':
start_consumer()
3. 测试
- 运行3个consumer.py,模拟一个消费者组中三个消费者
producer.py
发送数据
'''
Description: henggao_learning
version: v1.0.0
Author: henggao
Date: 2022-02-28 10:11:09
LastEditors: henggao
LastEditTime: 2022-03-03 23:40:25
'''
from concurrent.futures import thread
from kafka import KafkaProducer
from time import sleep, thread_time
# topic_name = 'kafka_demo'
topic_name = 'first'
def start_producer():
producer = KafkaProducer(bootstrap_servers=[
'192.168.92.145:9092', '192.168.92.146:9092'])
for i in range(0,500):
msg = 'msg_test is ' + str(i)
# producer.send(topic_name, msg.encode('utf-8') ,partition=0)
producer.send(topic_name, msg.encode('utf-8') )
sleep(0.03)
if __name__ == '__main__':
start_producer()
- 结果发现:一个分区的数据只能有消费者组中一个消费者消费