自动提交offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
- enable.auto.commit:是否开启自动提交offset功能,默认是true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
消费者自动提交offset
参数名称 | 描述 |
enable_auto_commit | 默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
auto_commit_interval_ms | 如果设置了 enable.auto.commit 的值为true, 则该值定义了消
费者偏移量向Kafka 提交的频率,默认5s。 |
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-05 19:52:08 LastEditors: henggao LastEditTime: 2022-03-05 19:53:35 ''' from kafka import KafkaConsumer from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor import time # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], group_id="my_group8", enable_auto_commit=True, auto_commit_interval_ms=1000) for msg in consumer: # print(msg) print("topic = %s" % msg.topic + ", partition = %d" % msg.partition + ", value = %s" % msg.value.decode()) # topic default is string if __name__ == '__main__': start_consumer()