Flink10:Windows本机实现Flink接收kafka生产者数据

Flink10:Windows本机实现Flink接收kafka生产者数据

0. 准备

版本信息

  • zookeeper:3.7.0
  • Kafka:kafka_2.13-3.1.0
  • Flink:Apache Flink 1.14.3
  • apache-flink: 1.14.4
  • kafka-python: 2.0.2

额外jar包

  • 默认pyflink/lib目录下只有几个jar包
notion image

环境准备

  1. 先启动zookeeper集群
  1. 再启动Kafka集群
  1. 本地运行程序

1. Flink

  • flink_kafka_print.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-14 14:48:09 LastEditors: henggao LastEditTime: 2022-03-14 15:59:13 ''' from pyflink.table import EnvironmentSettings, TableEnvironment # 1. 创建 TableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = TableEnvironment.create(env_settings) # 2. 创建 source 表 table_env.execute_sql(""" CREATE TABLE datagen ( id INT, name VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'flinkdemo', 'properties.bootstrap.servers' = 'redis01:9092', 'properties.group.id' = 'test_Print', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """) # 3. 创建 sink 表 table_env.execute_sql(""" CREATE TABLE print ( id INT, name VARCHAR ) WITH ( 'connector' = 'print' ) """) # 4. 查询 source 表,同时执行计算 # 通过 Table API 创建一张表: source_table = table_env.from_path("datagen") # 或者通过 SQL 查询语句创建一张表: #source_table = table_env.sql_query("SELECT * FROM datagen") result_table = source_table.select(source_table.id, source_table.name) print("result tabel:",type(result_table)) #print("r data: ",result_table.to_pandas()) # 5. 将计算结果写入给 sink 表 # 将 Table API 结果表数据写入 sink 表: result_table.execute_insert("print").wait() # 或者通过 SQL 查询语句来写入 sink 表: #table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()

2. Kafka producer

  • producer_json.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-14 15:29:38 LastEditors: henggao LastEditTime: 2022-03-14 16:25:51 ''' import json from kafka import KafkaConsumer, KafkaProducer class KProducer: def __init__(self, bootstrap_servers, topic): """ kafka 生产者 :param bootstrap_servers: 地址 :param topic: topic """ self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda m: json.dumps(m).encode('ascii'), ) # json 格式化发送的内容 self.topic = topic # self.api_version = (3, 1, 0) def sync_producer(self, data_li: list): """ 同步发送 数据 :param data_li: 发送数据 :return: """ for data in data_li: future = self.producer.send(self.topic, data) record_metadata = future.get(timeout=10) # 同步确认消费 partition = record_metadata.partition # 数据所在的分区 offset = record_metadata.offset # 数据所在分区的位置 print('save success, partition: {}, offset: {}'.format(partition, offset)) def asyn_producer(self, data_li: list): """ 异步发送数据 :param data_li:发送数据 :return: """ for data in data_li: self.producer.send(self.topic, data) self.producer.flush() # 批量提交 def asyn_producer_callback(self, data_li: list): """ 异步发送数据 + 发送状态处理 :param data_li:发送数据 :return: """ for data in data_li: self.producer.send(self.topic, data).add_callback( self.send_success).add_errback(self.send_error) self.producer.flush() # 批量提交 def send_success(self, *args, **kwargs): """异步发送成功回调函数""" print('save success') return def send_error(self, *args, **kwargs): """异步发送错误回调函数""" print('save error') return def close_producer(self): try: self.producer.close() except: pass if __name__ == '__main__': # send_data_li = [{"id": 2}, {"name": 'henggao'}] send_data_li = [{ "id": 2, "name": "henggao" }] # Tips:如果这里使用redis01:9092,需要在Windows机器的hosts里添加:(IP地址1 节点名1)192.168.92.145 redis01,否则第二次运行会报错:kafka.errors.NoBrokersAvailable: NoBrokersAvailable kp = KProducer(topic='flinkdemo', bootstrap_servers='192.168.92.145:9092') # 同步发送 # kp.sync_producer(send_data_li) # 异步发送 # kp.asyn_producer(send_data_li) # 异步+回调 kp.asyn_producer_callback(send_data_li) kp.close_producer()

3. 运行结果

  • 先运行flink_kafka_print.py ,在运行producer_json.py
notion image
notion image