Kafka41:Proudcer生成json

Kafka41:Proudcer生成json

Kafka

''' Description: henggao_note version: v1.0.0 Date: 2022-03-14 15:29:38 LastEditors: henggao LastEditTime: 2022-03-14 16:12:20 ''' import json from kafka import KafkaConsumer, KafkaProducer class KProducer: def __init__(self, bootstrap_servers, topic): """ kafka 生产者 :param bootstrap_servers: 地址 :param topic: topic """ self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda m: json.dumps(m).encode('ascii'), ) # json 格式化发送的内容 self.topic = topic # self.api_version = (3, 1, 0) def sync_producer(self, data_li: list): """ 同步发送 数据 :param data_li: 发送数据 :return: """ for data in data_li: future = self.producer.send(self.topic, data) record_metadata = future.get(timeout=10) # 同步确认消费 partition = record_metadata.partition # 数据所在的分区 offset = record_metadata.offset # 数据所在分区的位置 print('save success, partition: {}, offset: {}'.format(partition, offset)) def asyn_producer(self, data_li: list): """ 异步发送数据 :param data_li:发送数据 :return: """ for data in data_li: self.producer.send(self.topic, data) self.producer.flush() # 批量提交 def asyn_producer_callback(self, data_li: list): """ 异步发送数据 + 发送状态处理 :param data_li:发送数据 :return: """ for data in data_li: self.producer.send(self.topic, data).add_callback( self.send_success).add_errback(self.send_error) self.producer.flush() # 批量提交 def send_success(self, *args, **kwargs): """异步发送成功回调函数""" print('save success') return def send_error(self, *args, **kwargs): """异步发送错误回调函数""" print('save error') return def close_producer(self): try: self.producer.close() except: pass if __name__ == '__main__': send_data_li = [{"id": 2}, {"name": 'henggao'}] kp = KProducer(topic='flinkdemo', bootstrap_servers='192.168.92.145:9092') #Tips:如果这里使用redis01:9092,需要在Windows机器的hosts里添加:(IP地址1 节点名1)192.168.92.145 redis01,否则第二次运行会报错:kafka.errors.NoBrokersAvailable: NoBrokersAvailable # 同步发送 # kp.sync_producer(send_data_li) # 异步发送 # kp.asyn_producer(send_data_li) # 异步+回调 kp.asyn_producer_callback(send_data_li) kp.close_producer()