指定offset消费
auto.offset.reset= earliest | latest | none
默认是 latest 。当Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
- earliest :自动将偏移量重置为最早的偏移量
—from-beginning
。
- latest (默认值 ):自动将偏移量重置为最新偏移量。
- none :如果未找到消费者组的先前偏移量,则向消费者抛出异常
指定offset位移开始消费
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-05 19:52:08 LastEditors: henggao LastEditTime: 2022-03-05 23:26:15 ''' from tokenize import group from kafka import KafkaConsumer # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], ) print(consumer.topics()) print(consumer.assignment()) assignment = consumer.assignment() # 保证分区分配方案已经制定 while (assignment.__sizeof__ == 0): consumer.poll() assignment = consumer.assignment() for topic in assignment: consumer.seek(topic, 2500) # 从2500开始消费 for msg in consumer: print(msg) if __name__ == '__main__': start_consumer()