Kafka25:消费者API

Kafka25:消费者API

 

独立的消费者案例(订阅主题)

1. 需求

  • 创建一个独立的消费者,消费first主题中的数据
notion image
  • 在消费者 API 代码中必须配置消费者组 id 。 命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id 。
 

2. 实现步骤

''' Description: henggao_learning version: v1.0.0 Author: henggao Date: 2022-02-28 14:56:27 LastEditors: henggao LastEditTime: 2022-03-03 15:49:17 ''' # ==========读取指定位置消息=============== from pydoc_data.topics import topics from kafka import KafkaConsumer from kafka.structs import TopicPartition # 设置topic的名称 # topic_name = 'my_favorite_topic2' # topic_name = 'test_topic' topic_name = 'first' consumer = KafkaConsumer(topic_name,bootstrap_servers=['192.168.92.145:9092', '192.168.92.146:9092']) print(consumer.partitions_for_topic(topic_name)) #获取first主题的分区信息 print(consumer.topics()) #获取主题列表 print(consumer.subscription()) #获取当前消费者订阅的主题 print(consumer.assignment()) #获取当前消费者topic、分区信息 print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量 consumer.seek(TopicPartition(topic=topic_name, partition=0), 5) #重置偏移量,从第5个偏移量消费 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

3. 测试

  1. 消费者订阅一个主题
  1. 命令行发送数据
root@redis01:/usr/local/kafka# ./bin/kafka-console-producer.sh --bootstrap-server redis01:9092 --topic first >Hello Java >
notion image

独立的消费者案例(订阅分区)

1. 需求

  • 指定消费者的分区
notion image

2. 实现步骤

  • consumer.py
  • 使用assgin订阅分区,consumer.assign([TopicPartition('first', 0)])
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-03 17:05:04 LastEditors: henggao LastEditTime: 2022-03-03 19:44:25 Des:Conusmer指定分区 ''' from pydoc_data.topics import topics from kafka import KafkaConsumer,TopicPartition consumer = KafkaConsumer( bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092'], # kafka集群地址 group_id="my_group1", # 消费组id enable_auto_commit=True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交) auto_commit_interval_ms=5000, # 自动提交的周期(毫秒) api_version=(0, 10) ) # 指定Consumer的一个消费分区 consumer.assign([TopicPartition('first', 0)]) # 指定Consumer的多个消费分区 # consumer.assign([TopicPartition('first', 0),TopicPartition('first', 1)]) print(consumer.partitions_for_topic("first")) #获取my_favorite_topic2主题的分区信息 print(consumer.topics()) #获取主题列表 # print(consumer.subscription()) # 获取当前消费者订阅的主题 print(consumer.assignment()) #获取当前消费者topic、分区信息 print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏 # for msg in consumer: # 迭代器,等待下一条消息 # print (msg) # 打印消息

3. 测试

  • proudcer,py
''' Description: henggao_learning version: v1.0.0 Author: henggao Date: 2022-02-28 10:11:09 LastEditors: henggao LastEditTime: 2022-03-03 22:33:02 ''' from kafka import KafkaProducer from time import sleep # topic_name = 'kafka_demo' topic_name = 'first' def start_producer(): producer = KafkaProducer(bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092']) for i in range(0,5): msg = 'msg_test is ' + str(i) producer.send(topic_name, msg.encode('utf-8') ,partition=0) sleep(3) if __name__ == '__main__': start_producer()
notion image
  • consumer.assign([TopicPartition('first', 0)]) 订阅0分区,有数据
  • consumer.assign([TopicPartition('first', 1)]) 订阅1分区,没有有数据
 

消费者组案例

1. 需求

  • 需求:测试同一个 主题的分区 数据 只能 由 一个消费者 组中的一个 消费
notion image

2. 实现步骤

  • group_id="my_group1" 表示三个消费者在一个组my_group1
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-03 23:18:41 LastEditors: henggao LastEditTime: 2022-03-03 23:41:28 ''' from kafka import KafkaConsumer import time # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092','192.168.92.147:9092','192.168.92.148:9092'], group_id="my_group1", ) for msg in consumer: print(msg) print("topic = %s" % msg.topic) # topic default is string print("partition = %d" % msg.partition) print("value = %s" % msg.value.decode()) # bytes to string print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(msg.timestamp/1000))) if __name__ == '__main__': start_consumer()

3. 测试

  1. 运行3个consumer.py,模拟一个消费者组中三个消费者
notion image
  1. producer.py 发送数据
''' Description: henggao_learning version: v1.0.0 Author: henggao Date: 2022-02-28 10:11:09 LastEditors: henggao LastEditTime: 2022-03-03 23:40:25 ''' from concurrent.futures import thread from kafka import KafkaProducer from time import sleep, thread_time # topic_name = 'kafka_demo' topic_name = 'first' def start_producer(): producer = KafkaProducer(bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092']) for i in range(0,500): msg = 'msg_test is ' + str(i) # producer.send(topic_name, msg.encode('utf-8') ,partition=0) producer.send(topic_name, msg.encode('utf-8') ) sleep(0.03) if __name__ == '__main__': start_producer()
  1. 结果发现:一个分区的数据只能有消费者组中一个消费者消费
notion image