Kafka40:Python库confluent_kafka使用

Kafka40:Python库confluent_kafka使用

安装

pip install confluent-kafka

使用confluent_kafka查看Kafka集群分区数据

  • consumer.py
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-04 20:54:20 LastEditors: henggao LastEditTime: 2022-03-04 21:09:10 ''' from confluent_kafka import Consumer c = Consumer({ 'bootstrap.servers': '192.168.92.145:9092 , 192.168.92.146:9092', # kafka所在ip地址,多个地址用逗号隔开。 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' }) c.subscribe(['first']) while True: msg = c.poll(1) if msg is None: continue else: if not msg.error() is None: print(msg.error()) else: message = msg.value() print(msg.partition()) print(msg.offset())

生产数据与消费数据

  • producer.py
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-04 21:19:21 LastEditors: henggao LastEditTime: 2022-03-04 21:25:50 ''' from confluent_kafka import Producer p = Producer( {'bootstrap.servers': '192.168.92.145:9092 , 192.168.92.146:9092'}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format( msg.topic(), msg.partition())) for i in range(0, 10): # Trigger any available delivery report callbacks from previous produce() calls msg = 'msg_test is ' + str(i) p.poll(0) # Asynchronously produce a message, the delivery report callback # will be triggered from poll() above, or flush() below, when the message has # been successfully delivered or failed permanently. # p.produce('mytopic', data.encode('utf-8'), callback=delivery_report) p.produce('mytopic', msg, callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. p.flush()
  • consumer.py
''' Description: henggao_note version: v1.0.0 Author: henggao Date: 2022-03-04 20:54:20 LastEditors: henggao LastEditTime: 2022-03-04 21:24:26 ''' from confluent_kafka import Consumer c = Consumer({ 'bootstrap.servers': '192.168.92.145:9092 , 192.168.92.146:9092', # kafka所在ip地址,多个地址用逗号隔开。 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' }) c.subscribe(['mytopic']) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): print("Consumer error: {}".format(msg.error())) continue # print('Received message: {}'.format(msg.value().decode('utf-8'))) print('Received message: {}'.format(msg.value())) c.close()
notion image
 
 
参数配置