Kafka32:手动提交offset

Kafka32:手动提交offset

手动提交offset

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
notion image

手动提交案例

1. 同步提交

  • 由于同步提交offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提 交的效率比较低。以下为同步提交offset 的示例。

2. 异步提交

  • 虽然同步提交offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会 受 到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-05 19:52:08 LastEditors: henggao LastEditTime: 2022-03-05 21:29:16 ''' from kafka import KafkaConsumer # topic_name = 'my_favorite_topic2' topic_name = 'first' def start_consumer(): consumer = KafkaConsumer(topic_name, bootstrap_servers=[ '192.168.92.145:9092', '192.168.92.146:9092', '192.168.92.147:9092', '192.168.92.148:9092'], group_id="my_group8", enable_auto_commit=False, auto_commit_interval_ms=1000) for msg in consumer: # print(msg) print("topic = %s" % msg.topic + ", partition = %d" % msg.partition + ", value = %s" % msg.value.decode()) # topic default is string # consumer.commit() # 同步提交 consumer.commit_async() # 异步提交 if __name__ == '__main__': start_consumer()