参数设置参考
PyFlink
pyflink_mysql.py
'''
Description: henggao_note
version: v1.0.0
Date: 2022-03-13 22:50:02
LastEditors: henggao
LastEditTime: 2022-03-15 15:47:10
'''
# -*-coding:UTF-8-*-
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.common.restart_strategy import RestartStrategies
# 创建TableEnvironment,并选择使用的Planner
env = StreamExecutionEnvironment.get_execution_environment()
# 系统自动分配并行度,test模式下无法申请足够的slot 无法运行,所以设置为1
env.set_parallelism(1)
# 没有指定重启策略,在本地部署时,不需要指定重启策略。
env.set_restart_strategy(RestartStrategies.no_restart())
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
sourceKafkaDdl = """
CREATE TABLE sourceKafka (
stationId INT,
stationName VARCHAR,
passengerNum INT
) WITH (
'connector' = 'kafka',
'topic' = 'flinkdemo',
'properties.zookeeper.connect' = '192.168.92.145:2181/kafka',
'properties.bootstrap.servers' = '192.168.92.145:9092',
'properties.group.id' = 'test_Print',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
)
"""
# 数据库地址改为自己的,端口也是如此还有数据库名
mysqlSinkDdl = """
CREATE TABLE test3 (
`stationId` INT,
`stationName` VARCHAR,
`passengerNum` INT,
PRIMARY KEY (stationId) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.92.145:3306/flink_test',
'table-name' = 'test3',
'username' = 'my_user',
'password' = '123456',
'sink.buffer-flush.interval' = '1s'
)
"""
# 创建Kafka数据源表
t_env.execute_sql(sourceKafkaDdl)
# 创建MySql结果表
t_env.execute_sql(mysqlSinkDdl)
# 取ID,TRUCK_ID两个字段写入到mysql
t_env.from_path('sourceKafka')\
.select("stationId,"
"stationName,"
"passengerNum")\
.insert_into("test3")
# 执行作业
t_env.execute("pyFlink_mysql")
运行
- 运行文件
pyflink_mysql.py
- 打开kafka,写入数据
root@redis01:/usr/local/kafka# ./bin/kafka-console-producer.sh --broker-list 192.168.92.145:9092 --topic flinkdemo
>1,车站1,1001
- 查看mysql数据库