Kafka27:消费者Range分配

Kafka27:消费者Range分配

 

Range 以及再平衡

1. Range 分区策略原理

  • 注意
    • 如果只是针对1 个topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有N 多个topic,那么针对每个topic,消费者C0都将多消费1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费N 个分区。
    • 容易产生数据倾斜!

2. Range 分区策略案例

  1. 修改主题first 为7 个分区
# 增加first主题的分区数 root@redis01:/usr/local/kafka# bin/kafka-topics.sh --bootstrap-server redis01:9092 --alter --topic first --partitions 7 # 查看 root@redis01:/usr/local/kafka# bin/kafka-topics.sh --bootstrap-server redis01:9092 --describe --topic first
🚀Tips:分区数通过命令行只能增加,不能减少。
notion image
  1. 创建消费者
consumer1.pyconsumer2.pyconsumer3.py 内容一样,这样可以由三个消费者Consumer1、Consumer2、Consumer3 组成消费者组,组名都为“my_group1”,同时启动3 个消费者。
  • 注意partition_assignment_strategy=[RangePartitionAssignor] ,需要导入from kafka.coordinator.assignors.range import RangePartitionAssignor
  • 小插曲:python-Kafka模块测试失败,我在这里这样partition_assignment_strategy=['RangePartitionAssignor'] 导入,一直报错,让我花了好久一会儿,我原本认为是python-Kafka库已经设置好了字符转化,原来需要自己from kafka.coordinator.assignors.range import RangePartitionAssignor使用。🤣令我高兴的是,尝试了另一个Python的Kafka库confluent_kafka 。😁
notion image
 
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-03 23:18:41 LastEditors: henggao LastEditTime: 2022-03-05 10:54:35 ''' from kafka import KafkaConsumer from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor # from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor import time # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], group_id="my_group2", partition_assignment_strategy=[RangePartitionAssignor], ) for msg in consumer: # print(msg) # print("topic = %s" % msg.topic) # topic default is string # print("partition = %d" % msg.partition) # print("value = %s" % msg.value.decode()) # bytes to string # print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", # time.localtime(msg.timestamp/1000))) print("topic = %s" % msg.topic + ", partition = %d" % msg.partition + ", value = %s" % msg.value.decode()) # topic default is string if __name__ == '__main__': start_consumer()
notion image
  1. 创建生成者
  • producter.py ,发送500条数据,随机发送到不同的分区。
''' Description: henggao_learning version: v1.0.0 Author: henggao Date: 2022-02-28 10:11:09 LastEditors: henggao LastEditTime: 2022-03-04 09:31:49 ''' from concurrent.futures import thread from kafka import KafkaProducer from time import sleep, thread_time # topic_name = 'kafka_demo' topic_name = 'first' def start_producer(): producer = KafkaProducer(bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092']) for i in range(0, 500): msg = 'msg_test is ' + str(i) # producer.send(topic_name, msg.encode('utf-8') ,partition=0) producer.send(topic_name, msg.encode('utf-8')) sleep(0.03) if __name__ == '__main__': start_producer()
  1. 查看
notion image
  • 可以发现consumer1.pyconsumer2.pyconsumer3.py ,3 个消费者分别消费哪些分区的数据
消费者
消费分区数据
conusmer1
3,4
conusmer2
0,1,2
conusmer3
5,6
说明: Kafka 默认的分区分配策略就是 Range + CooperativeSticky 所以不需要修改策略。

3. Range分区分配再平衡案例

场景一:Consumer1挂掉一段时间内(45s以内)

停止掉consumer1消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
  1. consumer1消费者的任务会整体被分配到consumer2消费者或者consumer3消费者。
  1. consumer2消费者:消费到0,1,2,3,4 号分区数据。
  1. consumer3消费者:消费到5,6号分区数据。
说明:consumer1消费者挂掉后,消费者组需要按照超时时间45s 来判断它是否退出,所以需 要等待,时间到了45s 后,判断它真的退出就会把任务分配给其他broker 执行。
消费者
消费分区数据
停止consumer1后消费分区数据
conusmer1
3,4
conusmer2
0,1,2
0,1,2
conusmer3
5,6
3,4,5,6

场景二:Consumer1挂掉一段时间后(45s后)

再次重新发送消息观看结果(45s 以后)。
  1. consumer2消费者:消费到0,1,2,3号分区数据。
  1. consumer3消费者:消费到4,5,6号分区数据。
说明:consumer1消费者已经被踢出消费者组,所以重新按照range方式分配。
消费者
消费分区数据
conusmer2
0,1,2,3
conusmer3
4,5,6
 
 
 

另一个库:confluent_kafka

Global configuration properties
Property
C/P
Default
Importance
Description
partition.assignment.strategy
C
range,roundrobin
medium
he name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. Type: string