Kafka34:按时间消费

Kafka34:按时间消费

按时间消费

需求:在生产环境中,会遇到 最近消费的几个小时数据异常,想重新按照时间消费。 例如要求按照时间消费前一天的数据,怎么处理?

实现案例

  • consumer.py
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-05 19:52:08 LastEditors: henggao LastEditTime: 2022-03-06 11:05:49 ''' from tokenize import group from kafka import KafkaConsumer from kafka.structs import TopicPartition, OffsetAndTimestamp import time import datetime # 设置时间戳 # 先获得时间数组格式的日期 dayAgo = (datetime.datetime.now() - datetime.timedelta(days=1)) # 转换为时间戳,ms timeStamp = int(time.mktime(dayAgo.timetuple())) * 1000 # print(timeStamp) # 设置分区 topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], ) print(consumer.topics()) # 获取所有主题 print(consumer.assignment()) # 获取当前主题分区 assignment = consumer.assignment() # 保证分区分配方案已经制定 while (assignment.__sizeof__ == 0): consumer.poll() assignment = consumer.assignment() for topic in assignment: # 输出字典{TopicPartition(topic='first', partition=4): OffsetAndTimestamp(offset=693, timestamp=1646449104646)} myoffset = consumer.offsets_for_times({topic: timeStamp}) for item in myoffset.items(): # print(item[0]) # 拿到主题f分区 TopicPartition(topic='first', partition=4) # print(item[1].offset) # 拿到offset值 consumer.seek(item[0], item[1].offset) # 从2500开始消费 for msg in consumer: print(msg) if __name__ == '__main__': start_consumer()
 

输出

notion image