Flink7:Yarn 模式

Flink7:Yarn 模式

 

前提

Flink on Yarn前提是HDFS、YARN均启动。

环境准备:

  • Java
  • Hadoop
  • Zookeeper
  • Flink
 

Yarn Session任务模式

  • 单个Yarn session模式
notion image
这种方式需要先启动集群,然后在提交作业,接着会向yarn申请一块资源空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交,实际工作当中一般不会使用这种模式这种模式,不需要做任何配置,直接将任务提价到yarn集群上面去,我们需要提前启动hdfs以及yarn集群即可启动单个Yarn Session模式。

1. 修改Hadoop的yarn-site.xml配置文件

  1. redis01上修改Hadoop的yarn-site.xml配置文件
root@redis01:/usr/local/hadoop/etc/hadoop# vim yarn-site.xml
  1. 内容修改为
<configuration> <!-- Site specific YARN configuration properties --> <!-- 指定ResourceManager的地址 --> <property> <name>yarn.resourcemanager.hostname</name> <value>redis01</value> </property> <!-- reducer取数据的方式是mapreduce_shuffle --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- redis01(JobManager)失败重启的最大尝试次数 --> <property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description>The maximum number of application master execution attempts.</description> </property> <!-- 关闭yarn内存检查 --> <!-- 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认为 true --> <!-- 因为对于 flink 使用 yarn 模式下,很容易内存超标,这个时候 yarn 会自动杀掉 job,因此需要关掉--> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> </configuration>
  • 是否启动一个线程检查每个人物正在使用虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认为true
  • 此处需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job
notion image
  1. 分发配置到其他机器redis02、redis03
root@redis01:/usr/local/hadoop/etc/hadoop# xsync yarn-site.xml
  1. 重新启动yarn集群即可
# 在namenode服务器执行,我这里是redis01 root@redis01:/usr/local/hadoop/sbin# start-all.sh

2. 修改Flink配置文件

  1. 修改redis01机器上的flink的flink-conf.yaml配置文件
root@redis01:/usr/local/flink-standalone/conf# vim flink-conf.yaml
  1. 添加如下内容
# 用户提交作业失败时,重新执行次数 yarn.application-attempts: 4 #设置Task在所有节点平均分配,如果不设置此参数,可能所有task会集中在某一个节点执行,不会在所有TM之间平均分配。 # 官方参数说明:cluster.evenly-spread-out-slots :默认 false。Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available TaskExecutors. cluster.evenly-spread-out-slots: true
notion image
  1. 分发配置到其他机器redis02、redis03
root@redis01:/usr/local/flink-standalone/conf# xsync flink-conf.yaml

3. 在Yarn中启动Flink集群

直接在redis01执行以下命令,在yarn当中启动一个全新的flink集群,可以直接使用yarn-session.sh这个脚本来进行启动
# 主节点中执行 # -tm 表示每个 TaskManager 的内存大小 # -s 表示每个 TaskManager 的 slots 数量 # -d 表示以后台程序方式运行 root@redis01:/usr/local/flink-standalone# bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 1
Tips📢📢📢:yarn-session 命令使用帮助
bin/yarn-session.sh --help

4. 查看yarn管理界面8088

notion image

5. 提交任务

使用flink自带的jar包,实现单词计数统计功能 redis01准备文件并上传hdfs。
  1. 创建测试文件
root@redis01:/home/bigdata/Desktop# vim flink-test.txt
Hello Java Hello Flink Hello Hadoop hello henggao
  1. hdfs上面创建文件夹并上传文件
root@redis01:/home/bigdata/Desktop# hdfs dfs -mkdir -p /flink_input root@redis01:/home/bigdata/Desktop# hdfs dfs -put flink-test.txt /flink_input
  1. redis01执行以下命令,提交任务到flink集群
root@redis01:/usr/local/flink-standalone# bin/flink run ./examples/batch/WordCount.jar -input hdfs://redis01:8020/flink_input -output hdfs://redis01:8020/flink_output/flink-test-result.txt
  1. 查看
  • 使用hdfs查看输出结果
root@redis01:/usr/local/flink-standalone# hdfs dfs -text hdfs://redis01:8020/flink_output/flink-test-result.txt
notion image
notion image
notion image
 

6. 验证Yarn Session的高可用

通过redis01:8088这个界面,查看yarn session启动在哪一台机器上,然后杀死yarn session进程,我们会发现yarn session会重新启动在另外一台机器上面找到YarnSessionClusterEntrypoint所在的服务器,然后杀死该进程。
notion image
杀死yarn session进程
notion image
root@redis01:/usr/local/flink-standalone# kill -9 1266005
杀死YarnSessionClusterEntrypoint进程之后,会发现yarn集群会重新启动一个YarnSessionClusterEntrypoint进程在其他机器上面
notion image
notion image

7. 关闭Yarn Session

notion image
notion image
root@redis01:/usr/local/hadoop/bin# yarn application -kill application_1646963202203_0001
notion image

Yarn Per-job 任务模式

多个Yarn-session模式
这种方式的好处是一个任务会对应一个job,即每提交一个作业会根据自身的情况,向yarn申请资源,直到作业执行完成,并不会影响下一个作业的正常运行,除非是yarn上面没有任何资源的情况下。 注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败不需要在yarn当中启动任何集群,直接提交任务即可
 
notion image

直接提交 Job

root@redis01:/usr/local/flink-standalone# bin/flink run -t yarn-per-job --detached examples/batch/WordCount.jar --input hdfs://redis01:8020/flink_input/flink-test.txt
notion image

关闭Yarn Session

root@redis01:/usr/local/hadoop/bin# yarn application -kill application_1646963202203_0002

提交任务测试1

  • —detached,以分离模式运行作业,detached模式在提交完任务后就退出client
root@redis01:/usr/local/flink-standalone# bin/flink run -t yarn-per-job --detached examples/batch/WordCount.jar --input hdfs://redis01:8020/flink_input/flink-test.txt --output hdfs://redis01:8020/out_result/out_count311.txt

提交任务测试2

  1. 提交任务
root@redis01:/usr/local/flink-standalone# bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar --input hdfs://redis01:8020/flink_input/flink-test.txt --output hdfs://redis01:8020/out_result/out_count.txt
任务列表和取消任务命令:
# List running job on the cluster ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY # Cancel running job ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
  1. 查看输出结果
root@redis01:/usr/local/hadoop/bin# hdfs dfs -text hdfs://redis01:8020/out_result/out_count.txt
notion image

查看flink run帮助文档

root@redis01:/usr/local/flink-standalone# bin/flink run --help
 
只有在这些都完成之后,才会通过env.execute()方法触发Flink运行时真正地开始执行作业。如果所有用户都在Deployer上提交作业,较大的依赖会消耗更多的带宽,而较复杂的作业逻辑翻译成JobGraph也需要吃掉更多的CPU和内存,客户端的资源反而会成为瓶颈。不管Session还是Per-Job模式都存在此问题。为了解决,社区在传统部署模式的基础上实现了Application模式。

Application 模式

  • 短期模式推荐
Application 模式使用 bin/flink run-application 提交作业;
  • -t参数用来指定部署目标,目前支持YARN(yarn-application)和K8S(kubernetes-application)。
  • -D参数则用来指定与作业相关的各项参数。
 
  1. 带有 JM 和 TM 内存设置,设置 TaskManager slots 个数为3,以及指定并发数为3:
./bin/flink run-application -t yarn-application -p 3 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ ./examples/batch/WordCount.jar
  1. 指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D+通用配置代替客户端命令参数(比如 -p)。所以这样写更符合规范:
./bin/flink run-application -t yarn-application \ -Dparallelism.default=3 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ ./examples/batch/WordCount.jar
notion image
notion image
 
  1. 和 yarn.provided.lib.dirs 参数一起使用,可以充分发挥 application 部署模式的优势:
使用示例如下:my-application.jar 是用户 jar 包
./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist/lib;hdfs://myhdfs/my-remote-flink-dist/plugins" \ examples/streaming/my-application.jar
my-application.jar 也可以提前上传 hdfs:
./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist/lib;hdfs://myhdfs/my-remote-flink-dist/plugins" hdfs://myhdfs/jars/my-application.jar
也可以将 yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml,这时提交作业就和普通作业没有区别了:
./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ /local/path/to/my-application.jar
注意🚀🚀🚀:如果自己指定 yarn.provided.lib.dirs,有以下注意事项:
  • 需要将 lib 包和 plugins 包地址用;分开,从上面的例子中也可以看到,将 plugins 包放在 lib 目录下可能会有包冲突错误
  • plugins 包路径地址必须以 plugins 结尾,例如上面例子中的 hdfs://myhdfs/my-remote-flink-dist/plugins
  • hdfs 路径必须指定 nameservice(或 active namenode 地址),而不能使用简化方式(例如 hdfs:///path/to/lib)
该种模式的操作使得 flink 作业提交变得很轻量,因为所需的 Flink jar 包和应用程序 jar 将到指定的远程位置获取,而不是由客户端下载再发送到集群。这也是社区在 flink-1.11 版本引入新的部署模式的意义所在。

查看Application帮助文档

root@redis01:/usr/local/flink-standalone# bin/flink run-application -h

注意

在之前版本中如果使用的是flink on yarn方式,想切换回standalone模式的话,如果报错需要删除:【/tmp/.yarn-properties-root】
rm -rf /tmp/.yarn-properties-root
因为默认查找当前yarn集群中已有的yarn-session信息中的Jobmanager

参考

Flink 安装部署与快速入门_章鱼哥TuNan&Z的博客-CSDN博客
不仅仅是一个高吞吐、低延迟的计算引擎,同时还提供很多高级的功能。: 有状态的计算、 支持强一致性语义以及至此 基于Event Time的WaterMark 对延迟或者乱序的数据进行处理 物理部署层 本地、独立集群、yarn管理集群上、云上 runtime核心层 该层为上层不同接口提供基础服务;也是flink分布式计算框架的核心实现层支持分布式stream作业的执行、jobgraph到executiongraph的映射转换、任务调度等。将datasteam和dataset转换成统一的可执行的task operator api&libraries层 Flink 首先支持了 Scala 和 Java 的 API,Python 也正在测试中; DataStream、DataSet、Table、SQL API,作为分布式数据处理框架,Flink同时提供了支撑 流计算和批计算的接口,两者都提供给用户丰富的数据处理高级API 扩展库 Flink 还包括用于复杂事件处理的CEP、机器学习库FlinkML、图处理库Gelly等。 checkpoint:检查点 基于chandy-lamport算法实现了一个分布式的一致性的快照,从而提供一致性的快照,从而提供了一致性的语义; 状态 startapi:valuestate、liststate、mapstate,近期添加了broadcaststate ​ ​ 事件驱动型应用是一类具有状态的应用,它**从一个或者多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。**事件驱动型应用是基于状态化流处理来完成 ,数据和计算不会分离,应用只需要访问本地(内存或磁盘)即可获取数据。 数据分析任务需要从原始数据中提取有价值的信息和指标。 batch analytics可以理解为周期性查询 Batch Analytics 就是传统意义上使用类似于Map Reduce、Hive、Spark Batch 等, 对作业进行分析、处理、生成离线报表 比如Flink应用凌晨从Recorded Events中读取昨天的数据,然后做周期查询运算,最后 将数据写入Database或者HDFS,或者直接将数据生成报表供公司上层领导决策使用。 Streaming analytics可以理解为连续性查询 比如实时展示双十一天猫销售GMV(Gross
Flink 安装部署与快速入门_章鱼哥TuNan&Z的博客-CSDN博客
FLINK-启动命令2(Application 模式) -Dyarn.provided.lib.dirs_CarloPan的博客-CSDN博客
1. 背景 flink-1.11 引入了一种新的部署模式,即 Application 模式。目前,flink-1.11 已经可以支持基于 Yarn 和 Kubernetes 的 Application 模式。 2. 优势 Session模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行。 Per-Job模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行。 通过以上两种模式的特点描述,可以看出,main方法都是在客户端执行,社区考虑到在客户端执行 main() 方法来获取 flink 运行时所需的依赖项,并生成 JobGraph,并将依赖项和 JobGraph 发送到集群的一系列过程中,由于需要大量的网络带宽下载依赖项并将二进制文件发送到集群,会造成客户端消耗大量的资源。尤其在大量用户共享客户端时,问题更加突出。因此,社区提出新的部署方式 Application 模式解决该问题。 3. 原理 Application 模式下,用户程序的 main 方法将在集群中而不是客户端运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止。在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。 4.
FLINK-启动命令2(Application 模式) -Dyarn.provided.lib.dirs_CarloPan的博客-CSDN博客