Flink Cluster With YARN
在YARN部署模式中,有三种部署方式:
- in Application Mode
- in Session Mode
- in a Per-Job Mode (deprecated)
YARN模式
独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但我们知道,Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所以接下来我们就将学习,在强大的 YARN 平台上 Flink 是如何集成部署的。
整体来说,YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配TaskManager 资源。
相关准备和配置
在 Flink1.8.0 之前的版本,想要以 YARN 模式部署 Flink 任务时,需要 Flink 是有 Hadoop 支持的。从 Flink 1.8 版本开始,不再提供基于 Hadoop 编译的安装包,若需要Hadoop 的环境支持,需要自行在官网下载 Hadoop 相关版本的组件flink-shaded-hadoop-2-uber-2.7.5-10.0.jar, 并将该组件上传至 Flink 的 lib 目录下。在 Flink 1.11.0 版本之后,增加了很多重要新特性,其中就包括增加了对Hadoop3.0.0 以及更高版本Hadoop 的支持,不再提供“flink-shaded-hadoop-*” jar 包,而是通过配置环境变量完成与 YARN 集群的对接。
在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有Hadoop,保证 Hadoop
版本至少在 2.2 以上,并且集群中安装有 HDFS 服务。具体配置步骤如下:
(1)下载并解压安装包,并将解压后的安装包重命名为flink-1.13.0-yarn,本节的相关操作都将默认在此安装路径下执行。
(2)配置环境变量,增加环境变量配置如下:
1 | $ sudo vim /etc/profile.d/my_env.sh |
这里必须保证设置了环境变量HADOOP_CLASSPATH。
(3)启动Hadoop 集群,包括 HDFS 和 YARN
(4)进入 conf 目录,修改 flink-conf.yaml 文件,修改以下配置,若在提交命令中不特定指明,这些配置将作为默认配置。
1 | $ cd /opt/module/flink-1.13.0-yarn/conf/ |
应用模式部署
应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。
(1)执行命令提交作业。
1 | $ bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCount |
(2)在命令行中查看或取消作业。
1 | ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY |
(3) 也可以通过yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。
1 | $ ./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" |
这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。
高可用
YARN 模式的高可用和独立模式(Standalone)的高可用原理不一样。
Standalone 模式中, 同时启动多个 JobManager, 一个为“领导者”(leader),其他为“后备”(standby), 当 leader 挂了, 其他的才会有一个成为 leader。
而 YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后, YARN 会再次启动一个, 所以其实是利用的 YARN 的重试次数来实现的高可用。
(1) 在 yarn-site.xml 中配置。
1 | <property> |
注意: 配置完不要忘记分发, 和重启 YARN。
(2) 在 flink-conf.yaml 中配置。
1 | yarn.application-attempts: 3 |
(3) 启动 yarn-session。
(4) 杀死 JobManager, 查看复活情况。
注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该小于这个值。