前提环境准备:Yarn Session任务模式1. 修改Hadoop的yarn-site.xml配置文件2. 修改Flink配置文件3. 在Yarn中启动Flink集群4. 查看yarn管理界面80885. 提交任务6. 验证Yarn Session的高可用7. 关闭Yarn SessionYarn Per-job 任务模式直接提交 Job关闭Yarn Session提交任务测试1提交任务测试2查看flink run帮助文档Application 模式查看Application帮助文档注意参考
前提
Flink on Yarn前提是HDFS、YARN均启动。
环境准备:
- Java
- Hadoop
- Zookeeper
- Flink
Yarn Session任务模式
- 单个Yarn session模式
这种方式需要先启动集群,然后在提交作业,接着会向yarn申请一块资源空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交,实际工作当中一般不会使用这种模式这种模式,不需要做任何配置,直接将任务提价到yarn集群上面去,我们需要提前启动hdfs以及yarn集群即可启动单个Yarn Session模式。
1. 修改Hadoop的yarn-site.xml配置文件
- redis01上修改Hadoop的yarn-site.xml配置文件
root@redis01:/usr/local/hadoop/etc/hadoop# vim yarn-site.xml
- 内容修改为
<configuration> <!-- Site specific YARN configuration properties --> <!-- 指定ResourceManager的地址 --> <property> <name>yarn.resourcemanager.hostname</name> <value>redis01</value> </property> <!-- reducer取数据的方式是mapreduce_shuffle --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- redis01(JobManager)失败重启的最大尝试次数 --> <property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description>The maximum number of application master execution attempts.</description> </property> <!-- 关闭yarn内存检查 --> <!-- 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认为 true --> <!-- 因为对于 flink 使用 yarn 模式下,很容易内存超标,这个时候 yarn 会自动杀掉 job,因此需要关掉--> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> </configuration>
- 是否启动一个线程检查每个人物正在使用虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认为true
- 此处需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job
- 分发配置到其他机器redis02、redis03
root@redis01:/usr/local/hadoop/etc/hadoop# xsync yarn-site.xml
- 重新启动yarn集群即可
# 在namenode服务器执行,我这里是redis01 root@redis01:/usr/local/hadoop/sbin# start-all.sh
2. 修改Flink配置文件
- 修改redis01机器上的flink的flink-conf.yaml配置文件
root@redis01:/usr/local/flink-standalone/conf# vim flink-conf.yaml
- 添加如下内容
# 用户提交作业失败时,重新执行次数 yarn.application-attempts: 4 #设置Task在所有节点平均分配,如果不设置此参数,可能所有task会集中在某一个节点执行,不会在所有TM之间平均分配。 # 官方参数说明:cluster.evenly-spread-out-slots :默认 false。Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available TaskExecutors. cluster.evenly-spread-out-slots: true
- 分发配置到其他机器redis02、redis03
root@redis01:/usr/local/flink-standalone/conf# xsync flink-conf.yaml
3. 在Yarn中启动Flink集群
直接在redis01执行以下命令,在yarn当中启动一个全新的flink集群,可以直接使用yarn-session.sh这个脚本来进行启动
# 主节点中执行 # -tm 表示每个 TaskManager 的内存大小 # -s 表示每个 TaskManager 的 slots 数量 # -d 表示以后台程序方式运行 root@redis01:/usr/local/flink-standalone# bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 1
Tips📢📢📢:yarn-session 命令使用帮助
bin/yarn-session.sh --help
4. 查看yarn管理界面8088
5. 提交任务
使用flink自带的jar包,实现单词计数统计功能 redis01准备文件并上传hdfs。
- 创建测试文件
root@redis01:/home/bigdata/Desktop# vim flink-test.txt
Hello Java Hello Flink Hello Hadoop hello henggao
- hdfs上面创建文件夹并上传文件
root@redis01:/home/bigdata/Desktop# hdfs dfs -mkdir -p /flink_input root@redis01:/home/bigdata/Desktop# hdfs dfs -put flink-test.txt /flink_input
- redis01执行以下命令,提交任务到flink集群
root@redis01:/usr/local/flink-standalone# bin/flink run ./examples/batch/WordCount.jar -input hdfs://redis01:8020/flink_input -output hdfs://redis01:8020/flink_output/flink-test-result.txt
- 查看
- 使用hdfs查看输出结果
root@redis01:/usr/local/flink-standalone# hdfs dfs -text hdfs://redis01:8020/flink_output/flink-test-result.txt
6. 验证Yarn Session的高可用
通过redis01:8088这个界面,查看yarn session启动在哪一台机器上,然后杀死yarn session进程,我们会发现yarn session会重新启动在另外一台机器上面找到YarnSessionClusterEntrypoint所在的服务器,然后杀死该进程。
杀死yarn session进程
root@redis01:/usr/local/flink-standalone# kill -9 1266005
杀死YarnSessionClusterEntrypoint进程之后,会发现yarn集群会重新启动一个YarnSessionClusterEntrypoint进程在其他机器上面
7. 关闭Yarn Session
root@redis01:/usr/local/hadoop/bin# yarn application -kill application_1646963202203_0001
Yarn Per-job 任务模式
多个Yarn-session模式
这种方式的好处是一个任务会对应一个job,即每提交一个作业会根据自身的情况,向yarn申请资源,直到作业执行完成,并不会影响下一个作业的正常运行,除非是yarn上面没有任何资源的情况下。
注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败不需要在yarn当中启动任何集群,直接提交任务即可
直接提交 Job
root@redis01:/usr/local/flink-standalone# bin/flink run -t yarn-per-job --detached examples/batch/WordCount.jar --input hdfs://redis01:8020/flink_input/flink-test.txt
关闭Yarn Session
root@redis01:/usr/local/hadoop/bin# yarn application -kill application_1646963202203_0002
提交任务测试1
—detached
,以分离模式运行作业,detached模式在提交完任务后就退出client
root@redis01:/usr/local/flink-standalone# bin/flink run -t yarn-per-job --detached examples/batch/WordCount.jar --input hdfs://redis01:8020/flink_input/flink-test.txt --output hdfs://redis01:8020/out_result/out_count311.txt
提交任务测试2
- 提交任务
root@redis01:/usr/local/flink-standalone# bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar --input hdfs://redis01:8020/flink_input/flink-test.txt --output hdfs://redis01:8020/out_result/out_count.txt
任务列表和取消任务命令:
# List running job on the cluster ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY # Cancel running job ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
- 查看输出结果
root@redis01:/usr/local/hadoop/bin# hdfs dfs -text hdfs://redis01:8020/out_result/out_count.txt
查看flink run帮助文档
root@redis01:/usr/local/flink-standalone# bin/flink run --help
只有在这些都完成之后,才会通过env.execute()方法触发Flink运行时真正地开始执行作业。如果所有用户都在Deployer上提交作业,较大的依赖会消耗更多的带宽,而较复杂的作业逻辑翻译成JobGraph也需要吃掉更多的CPU和内存,客户端的资源反而会成为瓶颈。不管Session还是Per-Job模式都存在此问题。为了解决,社区在传统部署模式的基础上实现了Application模式。
Application 模式
- 短期模式推荐
Application 模式使用
bin/flink run-application
提交作业;- -t参数用来指定部署目标,目前支持YARN(yarn-application)和K8S(kubernetes-application)。
- -D参数则用来指定与作业相关的各项参数。
- 带有 JM 和 TM 内存设置,设置 TaskManager slots 个数为3,以及指定并发数为3:
./bin/flink run-application -t yarn-application -p 3 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ ./examples/batch/WordCount.jar
- 指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D+通用配置代替客户端命令参数(比如 -p)。所以这样写更符合规范:
./bin/flink run-application -t yarn-application \ -Dparallelism.default=3 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ ./examples/batch/WordCount.jar
- 和 yarn.provided.lib.dirs 参数一起使用,可以充分发挥 application 部署模式的优势:
使用示例如下:
my-application.jar
是用户 jar 包./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist/lib;hdfs://myhdfs/my-remote-flink-dist/plugins" \ examples/streaming/my-application.jar
my-application.jar
也可以提前上传 hdfs:./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist/lib;hdfs://myhdfs/my-remote-flink-dist/plugins" hdfs://myhdfs/jars/my-application.jar
也可以将
yarn.provided.lib.dirs
配置到 conf/flink-conf.yaml,
这时提交作业就和普通作业没有区别了:./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ /local/path/to/my-application.jar
注意🚀🚀🚀:如果自己指定 yarn.provided.lib.dirs,有以下注意事项:
- 需要将 lib 包和 plugins 包地址用;分开,从上面的例子中也可以看到,将 plugins 包放在 lib 目录下可能会有包冲突错误
- plugins 包路径地址必须以 plugins 结尾,例如上面例子中的 hdfs://myhdfs/my-remote-flink-dist/plugins
- hdfs 路径必须指定 nameservice(或 active namenode 地址),而不能使用简化方式(例如 hdfs:///path/to/lib)
该种模式的操作使得 flink 作业提交变得很轻量,因为所需的 Flink jar 包和应用程序 jar 将到指定的远程位置获取,而不是由客户端下载再发送到集群。这也是社区在 flink-1.11 版本引入新的部署模式的意义所在。
查看Application帮助文档
root@redis01:/usr/local/flink-standalone# bin/flink run-application -h
注意
在之前版本中如果使用的是flink on yarn方式,想切换回standalone模式的话,如果报错需要删除:【/tmp/.yarn-properties-root】
rm -rf /tmp/.yarn-properties-root
因为默认查找当前yarn集群中已有的yarn-session信息中的Jobmanager