版本信息
- zookeeper:3.7.0
- Kafka:kafka_2.13-3.1.0
- kafka-python: 2.0.2
- MySQL:8.028
- Flink:Apache Flink 1.14.3(我的Linux中版本,这里没用到)
- apache-flink: 1.14.4(pyflink)
准备Connector包
Flink默认是没有打包connector的,所以我们需要下载各个connector所需的jar包并放入PyFlink的lib目录。获取pyflink的lib目录
import pyflink import os print(os.path.dirname(os.path.abspath(pyflink.__file__))+'\lib') # 打印出pyflink的lib目录 D:\Python38\lib\site-packages\pyflink\lib
下载以下四个jar包,放在
pylink/lib
目录下flink-sql-connector-kafka_2.12-1.14.4.jar
flink-jdbc_2.12-1.10.3.jar
flink-csv-1.14.4-sql-jar.jar
mysql-connector-java-8.0.28.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.4/flink-sql-connector-kafka_2.12-1.14.4.jar https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.14.4/flink-csv-1.14.4-sql-jar.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
我自己添加了以下几个jar包
准备
数据库准备
- 数据库MySQL数据库,登录数据库,创建对应的数据库、表
# 登录mysql,用户名my_user root@redis01:~# mysql -u my_user -p # 切换到flink,没有这个数据库需要创建 mysql> use flink
- 执行以下命令创建表,字段信息
CREATE TABLE IF NOT EXISTS `cdn_access_statistic` ( `province` VARCHAR(100), `access_count` BIGINT, `total_download`BIGINT, `download_speed` DOUBLE, PRIMARY KEY (province) )ENGINE=InnoDB DEFAULT CHARSET=utf8;
Kafka准备
- 启动zookeeper集群
- 启动kafka
- 在kafka中创建对应的topic:flinkdemo
Flink
参考connector参数设置
cdn_connector_ddl.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-13 15:00:20 LastEditors: henggao LastEditTime: 2022-03-15 11:21:15 ''' kafka_source_ddl = """ CREATE TABLE cdn_access_log ( uuid VARCHAR, client_ip VARCHAR, request_time BIGINT, response_size BIGINT, uri VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'flinkdemo', 'properties.zookeeper.connect' = '192.168.92.145:2181', 'properties.bootstrap.servers' = '192.168.92.145:9092', 'properties.group.id' = 'test_Print', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ) """ mysql_sink_ddl = """ CREATE TABLE cdn_access_statistic ( province VARCHAR, access_count BIGINT, total_download BIGINT, download_speed DOUBLE, PRIMARY KEY (province) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.92.145:3306/flink', 'table-name' = 'cdn_access_statistic', 'username' = 'my_user', 'password' = '123456', 'sink.buffer-flush.interval' = '1s' ) """
cdn_udf.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-13 15:02:39 LastEditors: henggao LastEditTime: 2022-03-15 10:13:05 ''' import re import json from pyflink.table import DataTypes from pyflink.table.udf import udf from urllib.parse import quote_plus from urllib.request import urlopen @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def ip_to_province(ip): """ format: { "ip": "111.203.130.97", "pro": "北京市", "proCode": "110000", "city": "北京市", "cityCode": "110000", "region": "", "regionCode": "0", "addr": "北京市 联通", "regionNames": "", "err": "" } """ try: urlobj = urlopen( 'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip)) data = str(urlobj.read(), "gbk") pos = re.search("{[^{}]+\}", data).span() geo_data = json.loads(data[pos[0]:pos[1]]) if geo_data['pro']: return geo_data['pro'] else: return geo_data['err'] except: return "UnKnow" ip_to_province()
cdn_demo.py.py
''' Description: henggao_note version: v1.0.0 Date: 2022-03-13 15:04:18 LastEditors: henggao LastEditTime: 2022-03-15 10:46:00 ''' import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings from cdn_udf import ip_to_province from cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl from pyflink.common.restart_strategy import RestartStrategies # 创建Table Environment, 并选择使用的Planner env = StreamExecutionEnvironment.get_execution_environment() # 系统自动分配并行度,test模式下无法申请足够的slot 无法运行,所以设置为1 env.set_parallelism(1) # 没有指定重启策略,在本地部署时,不需要指定重启策略。 env.set_restart_strategy(RestartStrategies.no_restart()) t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) # 创建Kafka数据源表 t_env.sql_update(kafka_source_ddl) # t_env.execute_sql(kafka_source_ddl) # 创建MySql结果表 t_env.sql_update(mysql_sink_ddl) # t_env.execute_sql(mysql_sink_ddl) # 注册IP转换地区名称的UDF t_env.register_function("ip_to_province", ip_to_province) # 核心的统计逻辑 t_env.from_path("cdn_access_log")\ .select("uuid, " "client_ip as province, " # IP 转换为地区名称 "response_size, request_time")\ .group_by("province")\ .select( # 计算访问量 "province, count(uuid) as access_count, " # 计算下载总量 "sum(response_size) as total_download, " # 计算下载速度 "sum(response_size) * 1.0 / sum(request_time) as download_speed") \ .insert_into("cdn_access_statistic") # 执行作业 t_env.execute("pyFlink_parse_cdn_log")
运行测试
- 先运行
cdn_demo.py.py
- 再运行kafka生产者,发送数据
# 启动kafka生产者 root@redis01:/usr/local/kafka# ./bin/kafka-console-producer.sh --broker-list 192.168.92.145:9092 --topic flinkdemo # 输入以下数据测试 abcd123,1.1.1.1,307,100000,https://www.aaa.com
- 查看MySQL数据库
这里用的是stream的api,这个任务只要你不取消他就一直在,
参考: