RoundRobin 分区策略原理RoundRobin 分区分配策略案例创建消费者创建生成者查看场景一:Consumer1挂掉一段时间内(45s以内)场景二:Consumer1挂掉一段时间后(45s后)
RoundRobin 分区策略原理
- RoundRobin 针对集群中所有Topic而言。
- RoundRobin 轮询分区策略,是把所有的partition 和所有的consumer 都列出来,然后按照hashcode 进行排序,最后通过轮询算法来分配partition 给到各个消费者。
RoundRobin 分区分配策略案例
创建消费者
- 依次在
Consumer1
、Consumer2
、Consumer3
三个消费者代码中修改分区分配策略为RoundRobin
。
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-03 23:18:41 LastEditors: henggao LastEditTime: 2022-03-05 10:54:35 ''' from kafka import KafkaConsumer from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor # from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor import time # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], group_id="my_group3", partition_assignment_strategy=[RoundRobinPartitionAssignor], ) for msg in consumer: # print(msg) # print("topic = %s" % msg.topic) # topic default is string # print("partition = %d" % msg.partition) # print("value = %s" % msg.value.decode()) # bytes to string # print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", # time.localtime(msg.timestamp/1000))) print("topic = %s" % msg.topic + ", partition = %d" % msg.partition + ", value = %s" % msg.value.decode()) # topic default is string if __name__ == '__main__': start_consumer()
创建生成者
producter.py
,发送500条数据,随机发送到不同的分区。
''' Description: henggao_learning version: v1.0.0 Author: henggao Date: 2022-02-28 10:11:09 LastEditors: henggao LastEditTime: 2022-03-04 09:31:49 ''' from concurrent.futures import thread from kafka import KafkaProducer from time import sleep, thread_time # topic_name = 'kafka_demo' topic_name = 'first' def start_producer(): producer = KafkaProducer(bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092']) for i in range(0, 500): msg = 'msg_test is ' + str(i) # producer.send(topic_name, msg.encode('utf-8') ,partition=0) producer.send(topic_name, msg.encode('utf-8')) sleep(0.03) if __name__ == '__main__': start_producer()
查看
- 可以发现
consumer1.py
、consumer2.py
、consumer3.py
,3 个消费者分别消费哪些分区的数据
消费者 | 消费分区数据 |
conusmer1 | 1,4 |
conusmer2 | 0,3,6 |
conusmer3 | 2,5 |
场景一:Consumer1挂掉一段时间内(45s以内)
停止掉consumer2消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
- consumer2消费者的任务会整体被分配到consumer1消费者或者consumer3消费者。
- consumer1消费者:消费到0,1,4 ,6号分区数据。
- consumer3消费者:消费到2,3,5号分区数据。
说明:consumer2消费者挂掉后,消费者组需要按照超时时间45s 来判断它是否退出,所以需
要等待,时间到了45s 后,判断它真的退出就会把任务分配给其他broker 执行。
消费者 | 消费分区数据 | 停止consumer1后消费分区数据 |
conusmer1 | 1,4 | 1,4,0,6 |
ㅤ | ||
conusmer3 | 2,5 | 2,5,3 |
场景二:Consumer1挂掉一段时间后(45s后)
再次重新发送消息观看结果(45s 以后)。
- consumer1消费者:消费到0,2,4,6号分区数据。
- consumer3消费者:消费到1,3,5号分区数据。
说明:consumer1消费者已经被踢出消费者组,所以重新按照roundobin方式分配。
消费者 | 消费分区数据 |
conusmer1 | 0,2,4,6 |
conusmer3 | 1,3,5 |