Kafka6:Kafka生产者

Kafka6:Kafka生产者

1. 生产者消息发送流程

1.1 发送原理

在消息发送的过程中,涉及到了两个线程——main 线程Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。
notion image

1.2 异步发送API

使用Python模拟异步发送

1.2.1 安装kafka-python

pip3 install kafka-python

1.2.2 异步发送

1.配置—2.创建生产者—3.发送数据—4.关闭资源
  1. 生产者,producer.py
from kafka import KafkaProducer from time import sleep def start_producer(): producer = KafkaProducer(bootstrap_servers=['192.168.92.145:9092','192.168.92.146:9092']) for i in range(0,5): msg = 'msg is ' + str(i) producer.send('my_favorite_topic2', msg.encode('utf-8')) sleep(3) if __name__ == '__main__': start_producer()
  • 参数说明:
    • from kafka import KafkaProducer
      producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
      future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
      result = future.get(timeout= 10)
      print(result)
      producer.send函数为发送消息
    • topic名称,必须指定
    • key :键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
    • value :值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
    • partition :指定发送的partition,由于kafka默认配置1个partition,故为0
    • future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替
 
  1. 消费者,consumer.py
from kafka import KafkaConsumer import time def start_consumer(): consumer = KafkaConsumer('my_favorite_topic2', bootstrap_servers = '192.168.92.145:9092') for msg in consumer: print(msg) print("topic = %s" % msg.topic) # topic default is string print("partition = %d" % msg.offset) print("value = %s" % msg.value.decode()) # bytes to string print("timestamp = %d" % msg.timestamp) print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) ) if __name__ == '__main__': start_consumer()
  • 参数说明
    • topic
    • partition
    • offset :这条消息的偏移量
    • timestamp :时间戳
    • timestamp_type :时间戳类型
    • key :key值,字节类型
    • value :value值,字节类型
    • checksum :消息的校验和
    • serialized_key_size :序列化key的大小
    • serialized_value_size :序列化value的大小,可以看到value=None时,大小为-1
    • auto_offset_reset='earliest’ 可以湖区最早的可读消息,auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest。源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
  1. Vscode中运行消费者显示
notion image
  • Ubuntu终端中同样可以监测消费者
notion image
  • 第1个参数为 topic名称,必须指定
key :键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
value :值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
partition :指定发送的partition,由于kafka默认配置1个partition,故为0
future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替

1.2.3 同步发送

  • 同步发送(通过get方法,等待Kafka的响应,判断消息是否发送成功)
import pickle import time from kafka import KafkaProducer # from kafka.errors import kafka_errors from kafka.errors import KafkaError producer = KafkaProducer( bootstrap_servers=['192.168.92.145:9092', '192.168.92.146:9092'], key_serializer=lambda k: pickle.dumps(k), value_serializer=lambda v: pickle.dumps(v) ) start_time = time.time() for i in range(0, 5): print('------{}---------'.format(i)) future = producer.send(topic="test_topic", key="num", value=i) # 同步阻塞,通过调用get()方法进而保证一定程序是有序的. try: record_metadata = future.get(timeout=10) # print(record_metadata.topic) # print(record_metadata.partition) # print(record_metadata.offset) # except kafka_errors as e: except KafkaError as e: print(str(e)) end_time = time.time() time_counts = end_time - start_time print(time_counts)
notion image
  • 终端监测显示
notion image

2. 自定义分区

  • producer.py
from multiprocessing.sharedctypes import Value from kafka import KafkaProducer from time import sleep from numpy import partition # topic_name = 'my_favorite_topic2' topic_name = 'test_topic' def start_producer(): producer = KafkaProducer(bootstrap_servers=['192.168.92.145:9092','192.168.92.146:9092']) for i in range(0,5): msg = 'msg_kafka is ' + str(i) if msg.__contains__("kafka"): user_partition = 2 else: user_partition = 1 # 自定义分区,根据是否包含“Kafka”字段进行分区,需要确保分区个数存在,默认是0 # 可以在终端使用:./bin/kafka-topics.sh --bootstrap-server redis01:9092 --topic test_topic --alter --partitions 3 创建3个分区 producer.send(topic_name, msg.encode('utf-8') ,partition=user_partition) sleep(3) if __name__ == '__main__': start_producer()
 
 

3. 异步发送+回调函数

import pickle import time from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers=['192.168.92.145:9092', '192.168.92.146:9092'], key_serializer=lambda k: pickle.dumps(k), value_serializer=lambda v: pickle.dumps(v) ) def on_send_success(*args, **kwargs): """ 发送成功的回调函数 :param args: :param kwargs: :return: """ print("success") return args def on_send_error(*args, **kwargs): """ 发送失败的回调函数 :param args: :param kwargs: :return: """ print("error") return args start_time = time.time() for i in range(0, 10): print('------{}---------'.format(i)) # 如果成功,传进record_metadata,如果失败,传进Exception. producer.send( topic="test_topic", key="num", value=i ).add_callback(on_send_success).add_errback(on_send_error) producer.flush() producer.close() end_time = time.time() time_counts = end_time - start_time print(time_counts)
notion image
 
  • ref