Flink13:PyFlink实现实时数据从Kafka消费到MySQL(Windows)

Flink13:PyFlink实现实时数据从Kafka消费到MySQL(Windows)

参数设置参考

PyFlink

pyflink_mysql.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-13 22:50:02 LastEditors: henggao LastEditTime: 2022-03-15 15:47:10 ''' # -*-coding:UTF-8-*- import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings from pyflink.common.restart_strategy import RestartStrategies # 创建TableEnvironment,并选择使用的Planner env = StreamExecutionEnvironment.get_execution_environment() # 系统自动分配并行度,test模式下无法申请足够的slot 无法运行,所以设置为1 env.set_parallelism(1) # 没有指定重启策略,在本地部署时,不需要指定重启策略。 env.set_restart_strategy(RestartStrategies.no_restart()) t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) sourceKafkaDdl = """ CREATE TABLE sourceKafka ( stationId INT, stationName VARCHAR, passengerNum INT ) WITH ( 'connector' = 'kafka', 'topic' = 'flinkdemo', 'properties.zookeeper.connect' = '192.168.92.145:2181/kafka', 'properties.bootstrap.servers' = '192.168.92.145:9092', 'properties.group.id' = 'test_Print', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ) """ # 数据库地址改为自己的,端口也是如此还有数据库名 mysqlSinkDdl = """ CREATE TABLE test3 ( `stationId` INT, `stationName` VARCHAR, `passengerNum` INT, PRIMARY KEY (stationId) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.92.145:3306/flink_test', 'table-name' = 'test3', 'username' = 'my_user', 'password' = '123456', 'sink.buffer-flush.interval' = '1s' ) """ # 创建Kafka数据源表 t_env.execute_sql(sourceKafkaDdl) # 创建MySql结果表 t_env.execute_sql(mysqlSinkDdl) # 取ID,TRUCK_ID两个字段写入到mysql t_env.from_path('sourceKafka')\ .select("stationId," "stationName," "passengerNum")\ .insert_into("test3") # 执行作业 t_env.execute("pyFlink_mysql")

运行

  1. 运行文件pyflink_mysql.py
  1. 打开kafka,写入数据
root@redis01:/usr/local/kafka# ./bin/kafka-console-producer.sh --broker-list 192.168.92.145:9092 --topic flinkdemo >1,车站1,1001
notion image
  1. 查看mysql数据库
notion image