Kafka29:消费者Sticky分配

Kafka29:消费者Sticky分配

 

Sticky分区策略原理

粘性分区定义:

  • 可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
  • 粘性分区是Kafka 从 0.11.x 版本开始引入这种分配策略 首先会尽量均衡的放置分区到消费者上面 ,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
 

RoundRobin 分区分配策略案例

创建消费者

  • 依次在Consumer1Consumer2Consumer3三个消费者代码中修改分区分配策略为RoundRobin
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-03 23:18:41 LastEditors: henggao LastEditTime: 2022-03-05 10:54:35 ''' from kafka import KafkaConsumer from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor # from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor import time # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], group_id="my_group6", partition_assignment_strategy=[RoundRobinPartitionAssignor], ) for msg in consumer: # print(msg) # print("topic = %s" % msg.topic) # topic default is string # print("partition = %d" % msg.partition) # print("value = %s" % msg.value.decode()) # bytes to string # print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", # time.localtime(msg.timestamp/1000))) print("topic = %s" % msg.topic + ", partition = %d" % msg.partition + ", value = %s" % msg.value.decode()) # topic default is string if __name__ == '__main__': start_consumer()
notion image

注意:关于Kafka-Python库报错

  • 使用
notion image

解决

notion image
  • 找到你pip安装Kafka-Python库的 kafka/coordinator/assignors/sticky/sticky_assignor.py
  • six.iteritems 修改为six.viewitems ,保存即可。
    • notion image

创建生成者

  • producter.py ,发送500条数据,随机发送到不同的分区。
''' Description: henggao_learning version: v1.0.0 Author: henggao Date: 2022-02-28 10:11:09 LastEditors: henggao LastEditTime: 2022-03-04 09:31:49 ''' from concurrent.futures import thread from kafka import KafkaProducer from time import sleep, thread_time # topic_name = 'kafka_demo' topic_name = 'first' def start_producer(): producer = KafkaProducer(bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092']) for i in range(0, 500): msg = 'msg_test is ' + str(i) # producer.send(topic_name, msg.encode('utf-8') ,partition=0) producer.send(topic_name, msg.encode('utf-8')) sleep(0.03) if __name__ == '__main__': start_producer()

查看

notion image
 
 
  • 可以发现consumer1.pyconsumer2.pyconsumer3.py ,3 个消费者分别消费哪些分区的数据
消费者
消费分区数据
conusmer1
0,1,2
conusmer2
3,5
conusmer3
4,6

场景

场景一:Consumer1挂掉一段时间内(45s以内)

停止掉consumer1消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
  1. consumer1消费者的任务会整体被分配到consumer2消费者或者consumer3消费者。
  1. consumer2消费者:消费到0,1,4 ,6号分区数据。
  1. consumer3消费者:消费到2,3,5号分区数据。
说明:consumer2消费者挂掉后,消费者组需要按照超时时间45s 来判断它是否退出,所以需 要等待,时间到了45s 后,判断它真的退出就会把任务分配给其他broker 执行。
消费者
消费分区数据
停止consumer1后消费分区数据
conusmer1
0,1,2
conusmer2
3,5
1,2,3,5
conusmer3
4,6
0,4,6

场景二:Consumer1挂掉一段时间后(45s后)

再次重新发送消息观看结果(45s 以后)。
  1. consumer2消费者:消费到0,1,2号分区数据。
  1. consumer3消费者:消费到3,4,5,6号分区数据。
说明:consumer1消费者已经被踢出消费者组,所以重新按照sticky方式分配。
消费者
消费分区数据
conusmer2
0,1,2
conusmer3
3,4,5,6
 
 

另一个库:confluent_kafka

Global configuration properties
Property
C/P
Default
Importance
Description
partition.assignment.strategy
C
range,roundrobin
medium
he name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. Type: string