Flink14:集群下使用Python文件(Linux)

Flink14:集群下使用Python文件(Linux)

测试1

  • adder.py
from pyflink.table import EnvironmentSettings, TableEnvironment # 创建 TableEnvironment env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]) # 使用 logging 模块 import logging logging.warning(table.get_schema()) # 使用 print 函数 print(table.get_schema())
集群运行adder.py
bin/flink run -t yarn-per-job --detached \ -Dparallelism.default=3 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount1" \ -Dtaskmanager.numberOfTaskSlots=3 \ -py /home/adder.py
成功显示
notion image

测试2:Yarn-session

  1. 启动
# -d 后台模式 # -jm jobmanager内存大小 # -tm taskmanager内存大小 # -s slot数 bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 3
  1. 访问:http://redis01:8088/
notion image
  1. 该模式下提交
bin/flink run -py examples/python/table/word_count.py
  1. 进入查看
notion image
notion image
  1. 查看日志,发现没有Pyflink
notion image

安装PyFlink

我刚开始认为安装了Flink顺带就安装了apache-flink😅
集群所有机器安装pyflink
root@redis01:~# pip install apache-flink
全部安装好了,再次运行
bin/flink run -py examples/python/table/word_count.py
notion image
发现刚刚安装apache-flink ,protobuf版本错误
notion image
pip install -U protobuf
notion image
pip install protobuf==3.17.3
所有机器全部安装好了,再次运行,成功了。
bin/flink run -py examples/python/table/word_count.py
notion image

关闭Yarn Sesssion

root@redis01:/usr/local/hadoop/bin# yarn application -kill application_1647760205805_0001
notion image

测试3:Per-job

案例一:

Per-job模式启动Flink集群,任务结束后会自动关闭Flink集群。
# 测试table/word_count.py root@redis01:/usr/local/flink-standalone# ./bin/flink run --target yarn-per-job --python examples/python/table/word_count.py # 测试datastream/word_count.py root@redis01:/usr/local/flink-standalone# ./bin/flink run --target yarn-per-job --python examples/python/datastream/word_count.py
也可以指定其他参数
bin/flink run -t yarn-per-job -yjm 1024 -ytm 1024 -py examples/python/table/word_count.py
简写也可以成功运行,加了--detached会报错,我还没搞为什么这两种方式。不可以后台运行?还是已经关闭了?🤔🤔🤔
root@redis01:/usr/local/flink-standalone# bin/flink run -t yarn-per-job --detached -py examples/python/table/word_count.py
notion image

案例二:

测试统计数据hdfs上的flink-test.txt ,并输出结果到hdfs上
root@redis01:/usr/local/flink-standalone# bin/flink run -t yarn-per-job -py examples/python/table/word_count.py --input hdfs://redis01:8020/flink_input/flink-test.txt --output hdfs://redis01:8020/out_result/out_count325.txt
查看结果
# 查看所有文件 root@redis01:~# hadoop fs -ls -R / # 查看结果,结果这是一个目录😅 root@redis01:/usr/local/flink-standalone# hadoop fs -cat /out_result/out_count325.txt # 查看文件,继续查看文件 root@redis01:/usr/local/flink-standalone# hadoop fs -cat /out_result/out_count325.txt/part-3f7d65e9-0bc2-40d1-9489-d028e54416b9-0-0
notion image
 

测试4:Application

参数
说明
-Dkubernetes.cluster-id=<ClusterId>
K8S上的j集群id
./bin/flink run-application \ --target kubernetes-application \ --parallelism 8 \ -Dkubernetes.cluster-id=<ClusterId> \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=<PyFlinkImageName> \ --pyModule word_count \ --pyFiles /opt/flink/examples/python/table/batch/word_count.py
 
./bin/flink run-application -t yarn-application \ -Dparallelism.default=3 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ --python ./examples/python/table/word_count.py
这种方式运行,需要jar包,python文件好像不行😕😕😕
notion image
运行以下命令报错:
./bin/flink run --target yarn-application --python examples/python/table/word_count.py
notion image