Kafka31:自动提交offset

Kafka31:自动提交offset

自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
notion image

消费者自动提交offset

参数名称
描述
enable_auto_commit
默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto_commit_interval_ms
如果设置了 enable.auto.commit 的值为true, 则该值定义了消 费者偏移量向Kafka 提交的频率,默认5s
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-05 19:52:08 LastEditors: henggao LastEditTime: 2022-03-05 19:53:35 ''' from kafka import KafkaConsumer from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor import time # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], group_id="my_group8", enable_auto_commit=True, auto_commit_interval_ms=1000) for msg in consumer: # print(msg) print("topic = %s" % msg.topic + ", partition = %d" % msg.partition + ", value = %s" % msg.value.decode()) # topic default is string if __name__ == '__main__': start_consumer()