Flink12:Python实现Windows环境跑Flink从Kafka向MySQL发数据

Flink12:Python实现Windows环境跑Flink从Kafka向MySQL发数据

版本信息

  • zookeeper:3.7.0
  • Kafka:kafka_2.13-3.1.0
  • kafka-python: 2.0.2
  • MySQL:8.028
  • Flink:Apache Flink 1.14.3(我的Linux中版本,这里没用到)
  • apache-flink: 1.14.4(pyflink)

准备Connector包

Flink默认是没有打包connector的,所以我们需要下载各个connector所需的jar包并放入PyFlink的lib目录。获取pyflink的lib目录
import pyflink import os print(os.path.dirname(os.path.abspath(pyflink.__file__))+'\lib') # 打印出pyflink的lib目录 D:\Python38\lib\site-packages\pyflink\lib
下载以下四个jar包,放在pylink/lib目录下
  • flink-sql-connector-kafka_2.12-1.14.4.jar
  • flink-jdbc_2.12-1.10.3.jar
  • flink-csv-1.14.4-sql-jar.jar
  • mysql-connector-java-8.0.28.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.4/flink-sql-connector-kafka_2.12-1.14.4.jar https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.14.4/flink-csv-1.14.4-sql-jar.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
notion image
我自己添加了以下几个jar包
notion image

准备

数据库准备

  1. 数据库MySQL数据库,登录数据库,创建对应的数据库、表
# 登录mysql,用户名my_user root@redis01:~# mysql -u my_user -p # 切换到flink,没有这个数据库需要创建 mysql> use flink
  1. 执行以下命令创建表,字段信息
CREATE TABLE IF NOT EXISTS `cdn_access_statistic` ( `province` VARCHAR(100), `access_count` BIGINT, `total_download`BIGINT, `download_speed` DOUBLE, PRIMARY KEY (province) )ENGINE=InnoDB DEFAULT CHARSET=utf8;

Kafka准备

  • 启动zookeeper集群
  • 启动kafka
  • 在kafka中创建对应的topic:flinkdemo

Flink

参考connector参数设置
  • cdn_connector_ddl.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-13 15:00:20 LastEditors: henggao LastEditTime: 2022-03-15 11:21:15 ''' kafka_source_ddl = """ CREATE TABLE cdn_access_log ( uuid VARCHAR, client_ip VARCHAR, request_time BIGINT, response_size BIGINT, uri VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'flinkdemo', 'properties.zookeeper.connect' = '192.168.92.145:2181', 'properties.bootstrap.servers' = '192.168.92.145:9092', 'properties.group.id' = 'test_Print', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ) """ mysql_sink_ddl = """ CREATE TABLE cdn_access_statistic ( province VARCHAR, access_count BIGINT, total_download BIGINT, download_speed DOUBLE, PRIMARY KEY (province) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.92.145:3306/flink', 'table-name' = 'cdn_access_statistic', 'username' = 'my_user', 'password' = '123456', 'sink.buffer-flush.interval' = '1s' ) """
  • cdn_udf.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-13 15:02:39 LastEditors: henggao LastEditTime: 2022-03-15 10:13:05 ''' import re import json from pyflink.table import DataTypes from pyflink.table.udf import udf from urllib.parse import quote_plus from urllib.request import urlopen @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def ip_to_province(ip): """ format: { "ip": "111.203.130.97", "pro": "北京市", "proCode": "110000", "city": "北京市", "cityCode": "110000", "region": "", "regionCode": "0", "addr": "北京市 联通", "regionNames": "", "err": "" } """ try: urlobj = urlopen( 'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip)) data = str(urlobj.read(), "gbk") pos = re.search("{[^{}]+\}", data).span() geo_data = json.loads(data[pos[0]:pos[1]]) if geo_data['pro']: return geo_data['pro'] else: return geo_data['err'] except: return "UnKnow" ip_to_province()
  • cdn_demo.py.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-13 15:04:18 LastEditors: henggao LastEditTime: 2022-03-15 10:46:00 ''' import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings from cdn_udf import ip_to_province from cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl from pyflink.common.restart_strategy import RestartStrategies # 创建Table Environment, 并选择使用的Planner env = StreamExecutionEnvironment.get_execution_environment() # 系统自动分配并行度,test模式下无法申请足够的slot 无法运行,所以设置为1 env.set_parallelism(1) # 没有指定重启策略,在本地部署时,不需要指定重启策略。 env.set_restart_strategy(RestartStrategies.no_restart()) t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) # 创建Kafka数据源表 t_env.sql_update(kafka_source_ddl) # t_env.execute_sql(kafka_source_ddl) # 创建MySql结果表 t_env.sql_update(mysql_sink_ddl) # t_env.execute_sql(mysql_sink_ddl) # 注册IP转换地区名称的UDF t_env.register_function("ip_to_province", ip_to_province) # 核心的统计逻辑 t_env.from_path("cdn_access_log")\ .select("uuid, " "client_ip as province, " # IP 转换为地区名称 "response_size, request_time")\ .group_by("province")\ .select( # 计算访问量 "province, count(uuid) as access_count, " # 计算下载总量 "sum(response_size) as total_download, " # 计算下载速度 "sum(response_size) * 1.0 / sum(request_time) as download_speed") \ .insert_into("cdn_access_statistic") # 执行作业 t_env.execute("pyFlink_parse_cdn_log")

运行测试

  1. 先运行cdn_demo.py.py
  1. 再运行kafka生产者,发送数据
# 启动kafka生产者 root@redis01:/usr/local/kafka# ./bin/kafka-console-producer.sh --broker-list 192.168.92.145:9092 --topic flinkdemo # 输入以下数据测试 abcd123,1.1.1.1,307,100000,https://www.aaa.com
notion image
  1. 查看MySQL数据库
notion image
这里用的是stream的api,这个任务只要你不取消他就一直在,
 
 
参考:
不会java但是想用flink,会python就够了 !PyFlink【一、进入pyflink的世界】_JingWeiZ-CSDN博客_flink python
通过一个代码样例开始使用pyflink 通过阅读pyflink的源码,逐步了解flink的python接口实现 本文使用的flink版本和pyflink版本基于1.10.1 flink作为当前最流行的流批统一的数据计算处理框架,其开箱即用的部署方式(standalone)对于刚刚接触flink的人来说是非常友好和吸引人的。你可以通过 地址找到你想要的版本,也可以 直接下载编译好的包来进行下载,当然scala源码包也可以 下载 flink的部署非常简单,如果你下载好了,你可以直接切换到解压后的目录下 并执行./bin/start-cluster.sh,默认端口为8080 浏览器打开访问一下试试,因为我这边8080被占用,flink会自动往后使用端口,所以我这边是8081 PyFlink 是什么?这个问题也许会让人感觉问题的答案太明显了,那就是 Flink + Python,也就是 Flink on Python。那么到底 Flink on Python 意味着这什么呢?那么一个非常容易想到的方面就是能够让 Python 用享受到 Flink 的所有功能。其实不仅如此,PyFlink 的存在还有另外一个非常重要的意义就是,Python on Flink,我们可以将 Python 丰富的生态计算能力运行在 Flink 框架之上,这将极大的推动 Python 生态的发展。其实,如果你再仔细深究一下,你会发现这个结合并非偶然。 Python 生态和大数据生态 Pythoh 生态与大数据生态有密不可分的关系,我们先看看大家都在用 Python 解决什么实际问题?通过一份用户调查我们发现,大多数 Python 用户正在解决 "数据分析","机器学习"的问题,那么这些问题场景在大数据领域也有很好的解决方案。那么 Python 生态和大数据生态结合,抛开扩大大数据产品的受众用户之外,对 Python 生态一个特别重要到意义就是单机到分布式的能力增强,我想,这也是大数据时代海量数据分析对 Python 生态的强需求。
不会java但是想用flink,会python就够了 !PyFlink【一、进入pyflink的世界】_JingWeiZ-CSDN博客_flink python