0. 准备
版本信息
- Flink:Apache Flink 1.14.3
额外jar包
- 下载本案例需要的jar包(版本最好和自己的对应💡)
- kafka-clients-3.1.0.jar
- flink-connector-kafka_2.12-1.14.3.jar
环境准备
- 先启动zookeeper集群
- 再启动Kafka集群
- 本地运行程序
1. Flink
'''
Description: henggao_note
version: v1.0.0
Date: 2022-03-14 14:48:09
LastEditors: henggao
LastEditTime: 2022-03-14 15:59:13
'''
from pyflink.table import EnvironmentSettings, TableEnvironment
# 1. 创建 TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
# 2. 创建 source 表
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
name VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'flinkdemo',
'properties.bootstrap.servers' = 'redis01:9092',
'properties.group.id' = 'test_Print',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# 3. 创建 sink 表
table_env.execute_sql("""
CREATE TABLE print (
id INT,
name VARCHAR
) WITH (
'connector' = 'print'
)
""")
# 4. 查询 source 表,同时执行计算
# 通过 Table API 创建一张表:
source_table = table_env.from_path("datagen")
# 或者通过 SQL 查询语句创建一张表:
#source_table = table_env.sql_query("SELECT * FROM datagen")
result_table = source_table.select(source_table.id, source_table.name)
print("result tabel:",type(result_table))
#print("r data: ",result_table.to_pandas())
# 5. 将计算结果写入给 sink 表
# 将 Table API 结果表数据写入 sink 表:
result_table.execute_insert("print").wait()
# 或者通过 SQL 查询语句来写入 sink 表:
#table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()
2. Kafka producer
'''
Description: henggao_note
version: v1.0.0
Date: 2022-03-14 15:29:38
LastEditors: henggao
LastEditTime: 2022-03-14 16:25:51
'''
import json
from kafka import KafkaConsumer, KafkaProducer
class KProducer:
def __init__(self, bootstrap_servers, topic):
"""
kafka 生产者
:param bootstrap_servers: 地址
:param topic: topic
"""
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda m: json.dumps(m).encode('ascii'), ) # json 格式化发送的内容
self.topic = topic
# self.api_version = (3, 1, 0)
def sync_producer(self, data_li: list):
"""
同步发送 数据
:param data_li: 发送数据
:return:
"""
for data in data_li:
future = self.producer.send(self.topic, data)
record_metadata = future.get(timeout=10) # 同步确认消费
partition = record_metadata.partition # 数据所在的分区
offset = record_metadata.offset # 数据所在分区的位置
print('save success, partition: {}, offset: {}'.format(partition, offset))
def asyn_producer(self, data_li: list):
"""
异步发送数据
:param data_li:发送数据
:return:
"""
for data in data_li:
self.producer.send(self.topic, data)
self.producer.flush() # 批量提交
def asyn_producer_callback(self, data_li: list):
"""
异步发送数据 + 发送状态处理
:param data_li:发送数据
:return:
"""
for data in data_li:
self.producer.send(self.topic, data).add_callback(
self.send_success).add_errback(self.send_error)
self.producer.flush() # 批量提交
def send_success(self, *args, **kwargs):
"""异步发送成功回调函数"""
print('save success')
return
def send_error(self, *args, **kwargs):
"""异步发送错误回调函数"""
print('save error')
return
def close_producer(self):
try:
self.producer.close()
except:
pass
if __name__ == '__main__':
# send_data_li = [{"id": 2}, {"name": 'henggao'}]
send_data_li = [{
"id": 2,
"name": "henggao"
}]
# Tips:如果这里使用redis01:9092,需要在Windows机器的hosts里添加:(IP地址1 节点名1)192.168.92.145 redis01,否则第二次运行会报错:kafka.errors.NoBrokersAvailable: NoBrokersAvailable
kp = KProducer(topic='flinkdemo', bootstrap_servers='192.168.92.145:9092')
# 同步发送
# kp.sync_producer(send_data_li)
# 异步发送
# kp.asyn_producer(send_data_li)
# 异步+回调
kp.asyn_producer_callback(send_data_li)
kp.close_producer()
3. 运行结果
- 先运行
flink_kafka_print.py
,在运行producer_json.py