Kafka28:消费者Roundrobin分配

Kafka28:消费者Roundrobin分配

RoundRobin 分区策略原理

  • RoundRobin 针对集群中所有Topic而言。
 
  • RoundRobin 轮询分区策略,是把所有的partition 和所有的consumer 都列出来,然后按照hashcode 进行排序,最后通过轮询算法来分配partition 给到各个消费者。
notion image

RoundRobin 分区分配策略案例

创建消费者

  • 依次在Consumer1Consumer2Consumer3三个消费者代码中修改分区分配策略为RoundRobin
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-03 23:18:41 LastEditors: henggao LastEditTime: 2022-03-05 10:54:35 ''' from kafka import KafkaConsumer from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor # from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor import time # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], group_id="my_group3", partition_assignment_strategy=[RoundRobinPartitionAssignor], ) for msg in consumer: # print(msg) # print("topic = %s" % msg.topic) # topic default is string # print("partition = %d" % msg.partition) # print("value = %s" % msg.value.decode()) # bytes to string # print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", # time.localtime(msg.timestamp/1000))) print("topic = %s" % msg.topic + ", partition = %d" % msg.partition + ", value = %s" % msg.value.decode()) # topic default is string if __name__ == '__main__': start_consumer()
notion image

创建生成者

  • producter.py ,发送500条数据,随机发送到不同的分区。
''' Description: henggao_learning version: v1.0.0 Author: henggao Date: 2022-02-28 10:11:09 LastEditors: henggao LastEditTime: 2022-03-04 09:31:49 ''' from concurrent.futures import thread from kafka import KafkaProducer from time import sleep, thread_time # topic_name = 'kafka_demo' topic_name = 'first' def start_producer(): producer = KafkaProducer(bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092']) for i in range(0, 500): msg = 'msg_test is ' + str(i) # producer.send(topic_name, msg.encode('utf-8') ,partition=0) producer.send(topic_name, msg.encode('utf-8')) sleep(0.03) if __name__ == '__main__': start_producer()

查看

notion image
 
  • 可以发现consumer1.pyconsumer2.pyconsumer3.py ,3 个消费者分别消费哪些分区的数据
消费者
消费分区数据
conusmer1
1,4
conusmer2
0,3,6
conusmer3
2,5

场景一:Consumer1挂掉一段时间内(45s以内)

停止掉consumer2消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
  1. consumer2消费者的任务会整体被分配到consumer1消费者或者consumer3消费者。
  1. consumer1消费者:消费到0,1,4 ,6号分区数据。
  1. consumer3消费者:消费到2,3,5号分区数据。
说明:consumer2消费者挂掉后,消费者组需要按照超时时间45s 来判断它是否退出,所以需 要等待,时间到了45s 后,判断它真的退出就会把任务分配给其他broker 执行。
消费者
消费分区数据
停止consumer1后消费分区数据
conusmer1
1,4
1,4,0,6
conusmer2
0,3,6
conusmer3
2,5
2,5,3

场景二:Consumer1挂掉一段时间后(45s后)

再次重新发送消息观看结果(45s 以后)。
  1. consumer1消费者:消费到0,2,4,6号分区数据。
  1. consumer3消费者:消费到1,3,5号分区数据。
说明:consumer1消费者已经被踢出消费者组,所以重新按照roundobin方式分配。
消费者
消费分区数据
conusmer1
0,2,4,6
conusmer3
1,3,5