准备Connector包
Flink默认是没有打包connector的,所以我们需要下载各个connector所需的jar包并放入PyFlink的lib目录。获取pyflink的lib目录。
- 这里有个坑,我亲自踩一下试试。
- 我在想:这些jar包既然是Flink就请您在跑,那么我们应该将jar包放在Flink的lib
- 那为什么Flink自带的pyflink跑不通程序呢?🤔🤔🤔前面我自己有安装了以下pyflink
错误的方式
import pyflink import os print(os.path.dirname(os.path.abspath(pyflink.__file__))+'\lib') # 打印出pyflink的lib目录 D:\Python38\lib\site-packages\pyflink\lib
下载以下四个jar包,放在
pylink/lib
目录下flink-sql-connector-kafka_2.12-1.14.4.jar
flink-jdbc_2.12-1.10.3.jar
flink-csv-1.14.4-sql-jar.jar
mysql-connector-java-8.0.28.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.4/flink-sql-connector-kafka_2.12-1.14.4.jar https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.14.4/flink-csv-1.14.4-sql-jar.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
进入目录
/usr/local/lib/python3.8/dist-packages/pyflink/lib/
root@redis01:~# cd /usr/local/lib/python3.8/dist-packages/pyflink/lib/ root@redis01:/usr/local/lib/python3.8/dist-packages/pyflink/lib#
默认jar包文件如下
添加了如下几个jar包
分发到其他机器redis02、redis03
root@redis01:/usr/local/lib/python3.8/dist-packages/pyflink# xsync lib/
测试代码
kafka2mysql.py
import re import json from pyflink.table import DataTypes from pyflink.table.udf import udf from urllib.parse import quote_plus from urllib.request import urlopen from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings from pyflink.common.restart_strategy import RestartStrategies kafka_source_ddl = """ CREATE TABLE cdn_access_log ( uuid VARCHAR, client_ip VARCHAR, request_time BIGINT, response_size BIGINT, uri VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'flinkdemo', 'properties.zookeeper.connect' = '192.168.92.145:2181', 'properties.bootstrap.servers' = '192.168.92.145:9092', 'properties.group.id' = 'test_Print', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ) """ mysql_sink_ddl = """ CREATE TABLE cdn_access_statistic ( province VARCHAR, access_count BIGINT, total_download BIGINT, download_speed DOUBLE, PRIMARY KEY (province) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.92.145:3306/flink', 'table-name' = 'cdn_access_statistic', 'username' = 'my_user', 'password' = '123456', 'sink.buffer-flush.interval' = '1s' ) """ @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def ip_to_province(ip): """ format: { "ip": "111.203.130.97", "pro": "北京市", "proCode": "110000", "city": "北京市", "cityCode": "110000", "region": "", "regionCode": "0", "addr": "北京市 联通", "regionNames": "", "err": "" } """ try: urlobj = urlopen( 'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip)) data = str(urlobj.read(), "gbk") pos = re.search("{[^{}]+\}", data).span() geo_data = json.loads(data[pos[0]:pos[1]]) if geo_data['pro']: return geo_data['pro'] else: return geo_data['err'] except: return "UnKnow" ip_to_province() # 创建Table Environment, 并选择使用的Planner env = StreamExecutionEnvironment.get_execution_environment() # 系统自动分配并行度,test模式下无法申请足够的slot 无法运行,所以设置为1 env.set_parallelism(1) # 没有指定重启策略,在本地部署时,不需要指定重启策略。 env.set_restart_strategy(RestartStrategies.no_restart()) t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) # 创建Kafka数据源表 t_env.sql_update(kafka_source_ddl) # t_env.execute_sql(kafka_source_ddl) # 创建MySql结果表 t_env.sql_update(mysql_sink_ddl) # t_env.execute_sql(mysql_sink_ddl) # 注册IP转换地区名称的UDF t_env.register_function("ip_to_province", ip_to_province) # 核心的统计逻辑 t_env.from_path("cdn_access_log")\ .select("uuid, " "client_ip as province, " # IP 转换为地区名称 "response_size, request_time")\ .group_by("province")\ .select( # 计算访问量 "province, count(uuid) as access_count, " # 计算下载总量 "sum(response_size) as total_download, " # 计算下载速度 "sum(response_size) * 1.0 / sum(request_time) as download_speed") \ .insert_into("cdn_access_statistic") # 执行作业 t_env.execute("pyFlink_parse_cdn_log")
使用per-job模式启动
root@redis01:/usr/local/flink-standalone# ./bin/flink run --target yarn-per-job --python /home/kafka2mysql.py
报错,重要的信息是:kafka没有连接器,所以验证了我前面的猜想,应该将jar还在flink的lib目录下
正确的方式
这是Flink安装目录
/usr/local/flink-standalone/lib
下默认的jar包添加所需要的jar包
分发到其他机器redis02、redis03
root@redis01:/usr/local/flink-standalone# xsync ./lib/
使用per-job模式启动
root@redis01:/usr/local/flink-standalone# ./bin/flink run --target yarn-per-job --python /home/kafka2mysql.py
访问浏览器,发现正常运行
开启Kafka,输入测试数据
root@redis01:/usr/local/kafka# ./bin/kafka-console-producer.sh --broker-list 192.168.92.145:9092 --topic flinkdemo >abcd123,1.1.1.1,307,100000,https://www.aaa.com >abcd123,1.1.1.1,307,100000,https://www.aaa.com >abcd123,1.1.1.1,307,100000,https://www.aaa.com
查看数据库,,发现数据可以统计
这个需要一直在统计,采用的是stream的api,这个任务只要你不取消他就一直在。