1. 生产者消息发送流程
1.1 发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。
1.2 异步发送API
使用Python模拟异步发送
1.2.1 安装kafka-python
包
pip3 install kafka-python
1.2.2 异步发送
1.配置—2.创建生产者—3.发送数据—4.关闭资源
- 生产者,
producer.py
from kafka import KafkaProducer from time import sleep def start_producer(): producer = KafkaProducer(bootstrap_servers=['192.168.92.145:9092','192.168.92.146:9092']) for i in range(0,5): msg = 'msg is ' + str(i) producer.send('my_favorite_topic2', msg.encode('utf-8')) sleep(3) if __name__ == '__main__': start_producer()
- 参数说明:
- topic名称,必须指定
- key :键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
- value :值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
- partition :指定发送的partition,由于kafka默认配置1个partition,故为0
- future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'])future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)result = future.get(timeout= 10)print(result)producer.send函数为发送消息
- 消费者,
consumer.py
from kafka import KafkaConsumer import time def start_consumer(): consumer = KafkaConsumer('my_favorite_topic2', bootstrap_servers = '192.168.92.145:9092') for msg in consumer: print(msg) print("topic = %s" % msg.topic) # topic default is string print("partition = %d" % msg.offset) print("value = %s" % msg.value.decode()) # bytes to string print("timestamp = %d" % msg.timestamp) print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) ) if __name__ == '__main__': start_consumer()
- 参数说明
- topic
- partition
- offset :这条消息的偏移量
- timestamp :时间戳
- timestamp_type :时间戳类型
- key :key值,字节类型
- value :value值,字节类型
- checksum :消息的校验和
- serialized_key_size :序列化key的大小
- serialized_value_size :序列化value的大小,可以看到value=None时,大小为-1
- auto_offset_reset='earliest’ 可以湖区最早的可读消息,auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest。源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
- Vscode中运行消费者显示
- Ubuntu终端中同样可以监测消费者
- 第1个参数为 topic名称,必须指定
key :键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
value :值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
partition :指定发送的partition,由于kafka默认配置1个partition,故为0
future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替
1.2.3 同步发送
- 同步发送(通过get方法,等待Kafka的响应,判断消息是否发送成功)
import pickle import time from kafka import KafkaProducer # from kafka.errors import kafka_errors from kafka.errors import KafkaError producer = KafkaProducer( bootstrap_servers=['192.168.92.145:9092', '192.168.92.146:9092'], key_serializer=lambda k: pickle.dumps(k), value_serializer=lambda v: pickle.dumps(v) ) start_time = time.time() for i in range(0, 5): print('------{}---------'.format(i)) future = producer.send(topic="test_topic", key="num", value=i) # 同步阻塞,通过调用get()方法进而保证一定程序是有序的. try: record_metadata = future.get(timeout=10) # print(record_metadata.topic) # print(record_metadata.partition) # print(record_metadata.offset) # except kafka_errors as e: except KafkaError as e: print(str(e)) end_time = time.time() time_counts = end_time - start_time print(time_counts)
- 终端监测显示
2. 自定义分区
producer.py
from multiprocessing.sharedctypes import Value from kafka import KafkaProducer from time import sleep from numpy import partition # topic_name = 'my_favorite_topic2' topic_name = 'test_topic' def start_producer(): producer = KafkaProducer(bootstrap_servers=['192.168.92.145:9092','192.168.92.146:9092']) for i in range(0,5): msg = 'msg_kafka is ' + str(i) if msg.__contains__("kafka"): user_partition = 2 else: user_partition = 1 # 自定义分区,根据是否包含“Kafka”字段进行分区,需要确保分区个数存在,默认是0 # 可以在终端使用:./bin/kafka-topics.sh --bootstrap-server redis01:9092 --topic test_topic --alter --partitions 3 创建3个分区 producer.send(topic_name, msg.encode('utf-8') ,partition=user_partition) sleep(3) if __name__ == '__main__': start_producer()
3. 异步发送+回调函数
import pickle import time from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers=['192.168.92.145:9092', '192.168.92.146:9092'], key_serializer=lambda k: pickle.dumps(k), value_serializer=lambda v: pickle.dumps(v) ) def on_send_success(*args, **kwargs): """ 发送成功的回调函数 :param args: :param kwargs: :return: """ print("success") return args def on_send_error(*args, **kwargs): """ 发送失败的回调函数 :param args: :param kwargs: :return: """ print("error") return args start_time = time.time() for i in range(0, 10): print('------{}---------'.format(i)) # 如果成功,传进record_metadata,如果失败,传进Exception. producer.send( topic="test_topic", key="num", value=i ).add_callback(on_send_success).add_errback(on_send_error) producer.flush() producer.close() end_time = time.time() time_counts = end_time - start_time print(time_counts)
- ref