Flink 集群部署模式

Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/

1、部署模式

在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink 为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。

它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的 main 方法到底在哪里执行——客户端(Client)还是 JobManager。

1.1、会话模式(Session Mode)

会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。

会话模式比较适合于单个规模小、执行时间短的大量作业。

1.2、单作业模式(Per-Job Mode)

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。

作业完成后,集群就会关闭,所有资源也会释放。

这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。

需要注意的是,Flink 本身无法直接运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes(K8S)。

1.3、应用模式(Application Mode)

前面提到的两种模式下,应用代码都是客户端上执行,然后由客户端提交给 JobManager 的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给 JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗

所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManager 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式

应用模式与单作业模式,都是提交作业之后才创建集群;

  • 单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;
  • 应用模式,是直接由 JobManager 执行应用程序的。

这里我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。接下来,我们就针对不同的资源提供者的场景,具体介绍 Flink 的部署方式。

2、Standalone 运行模式(了解)

独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下

2.1、会话模式部署

上一篇文章【Flink 集群搭建及作业提交】 就是 Standalone 集群的会话模式部署。

提前启动集群,并通过 Web 页面客户端提交任务(可以多个任务,但是集群资源固定)。

2.2、单作业模式部署

Flink 的 Standalone 集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台。

2.3、应用模式部署

应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在 bin 目录下的 standalone-job.sh 来创建一个 JobManager。

具体步骤如下:

  1. 环境准备。在 hadoop102 中执行以下命令启动 netcat。

    1
    [lepeng@hadoop102 flink-1.17.0]$ nc -lk 7777
  2. 进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/ 目录下。

    1
    [lepeng@hadoop102 flink-1.17.0]$ mv FlinkTutorial-1.0-SNAPSHOT.jar lib/
  3. 执行以下命令,启动 JobManager。

    1
    [lepeng@hadoop102 flink-1.17.0]$ bin/standalone-job.sh start --job-classname com.lepeng.wc.SocketStreamWordCount

    这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。

  4. 同样是使用 bin 目录下的脚本,启动 TaskManager。

    1
    [lepeng@hadoop102 flink-1.17.0]$ bin/taskmanager.sh start
  5. 在 hadoop102 上模拟发送单词数据。

1
2
[lepeng@hadoop102 ~]$ nc -lk 7777
hello
  1. 在 hadoop102:8081 地址中观察输出数据

  2. 如果希望停掉集群,同样可以使用脚本,命令如下。

    1
    2
    [lepeng@hadoop102 flink-1.17.0]$ bin/taskmanager.sh stop
    [lepeng@hadoop102 flink-1.17.0]$ bin/standalone-job.sh stop

3、YARN 运行模式(重点)

YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。

3.1、相关准备和配置

在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有 Hadoop,保证 Hadoop 版本至少在 2.2 以上,并且集群中安装有HDFS 服务。

具体配置步骤如下:

  1. 配置环境变量,增加环境变量配置如下:

    1
    2
    3
    4
    5
    6
    $ sudo vim /etc/profile.d/my_env.sh

    HADOOP_HOME=/opt/module/hadoop-3.3.4
    export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
    export HADOOP_CLASSPATH=`hadoop classpath`
  2. 启动 Hadoop 集群,包括 HDFS 和 YARN。

    1
    2
    [lepeng@hadoop102 hadoop-3.3.4]$ start-dfs.sh
    [lepeng@hadoop103 hadoop-3.3.4]$ start-yarn.sh
  3. 在 hadoop102 中执行以下命令启动 netcat。

    1
    [lepeng@hadoop102 flink-1.17.0]$ nc -lk 7777

3.2、会话模式部署

YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话(YARN Session)来启动 Flink 集群。具体步骤如下:

  1. 启动集群

    1. 启动 Hadoop 集群(HDFS、YARN)。

    2. 执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。

      1
      [lepeng@hadoop102 flink-1.17.0]$ bin/yarn-session.sh -nm test

    可用参数解读:

    • -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
    • -jm(–jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
    • -nm(–name):配置在 YARN UI 界面上显示的任务名。
    • -qu(–queue):指定 YARN 队列名。
    • -tm(–taskManager):配置每个 TaskManager 所使用内存。

    注意:Flink1.11.0 版本不再使用 -n 参数和 -s 参数分别指定 TaskManager 数量和 slot 数量,YARN 会按照需求动态分配TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也不会把集群资源固定,同样是动态分配的。

    YARN Session 启动之后会给出一个 Web UI 地址以及一个 YARN application ID,如下所示,用户可以通过 Web UI 或者命令行两种方式提交作业。

    1
    2
    2.22-11-17 15:20:52,711 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] *   Found Web Interface hadoop104:40825 of application 'application_1668668287070_0005'.
    JobManager Web Interface: http://hadoop104:40825
  2. 提交作业

    1. 通过 Web UI 提交作业

      这种方式比较简单,与上文所述 Standalone 部署模式基本相同。

    2. 通过命令行提交作业

      1. 将 FlinkTutorial-1.0-SNAPSHOT.jar 任务上传至集群。

      2. 执行以下命令将该任务提交到已经开启的 Yarn-Session 中运行。

        1
        2
        [lepeng@hadoop102 flink-1.17.0]$ bin/flink run
        -c com.lepeng.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar

        客户端可以自行确定 JobManager 的地址,也可以通过 -m 或者 -jobmanager 参数指定 JobManager 的地址,JobManager 的地址在 YARN Session 的启动页面中可以找到。

      3. 任务提交成功后,可在 YARN 的 Web UI 界面查看运行情况。hadoop103:8088。

        从上图中可以看到我们创建的 Yarn-Session 实际上是一个 Yarn 的 Application,并且有唯一的 Application ID。

      4. 也可以通过 Flink 的 Web UI 页面查看提交任务的运行情况。

3.3、单作业模式部署

在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一个单独的作业,从而启动一个 Flink 集群。

  1. 执行命令提交作业

    1
    [lepeng@hadoop102 flink-1.17.0]$ bin/flink run -d -t yarn-per-job -c com.lepeng.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar

    注意:如果启动过程中报如下异常。

    1
    2
    Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders

    解决办法:在 flink 的 /opt/module/flink-1.17.0/conf/flink-conf.yaml 配置文件中设置

    1
    2
    3
    [lepeng@hadoop102 conf]$ vim flink-conf.yaml

    classloader.check-leaked-classloader: false
  2. 在 YARN 的 ResourceManager 界面查看执行情况

    点击可以打开 Flink Web UI 页面进行监控,如下图所示:

  3. 可以使用命令行查看或取消作业,命令如下

    1
    2
    [lepeng@hadoop102 flink-1.17.0]$ bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY    
    [lepeng@hadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

    这里的application_XXXX_YY是当前应用的ID,<jobId>是作业的ID。注意如果取消作业,整个Flink集群也会停掉。

3.4、应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。

  1. 命令行提交

    1. 执行命令提交作业。

      1
      [lepeng@hadoop102 flink-1.17.0]$ bin/flink run-application -t yarn-application -c com.lepeng.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
    2. 在命令行中查看或取消作业。

      1
      2
      3
      [lepeng@hadoop102 flink-1.17.0]$ bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

      [lepeng@hadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
  2. 上传 HDFS 提交

    可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 Flink 的依赖上传到远程。

    1. 上传 Flink 的 lib 和 plugins 到 HDFS 上

      1
      2
      3
      [lepeng@hadoop102 flink-1.17.0]$ hadoop fs -mkdir /flink-dist
      [lepeng@hadoop102 flink-1.17.0]$ hadoop fs -put lib/ /flink-dist
      [lepeng@hadoop102 flink-1.17.0]$ hadoop fs -put plugins/ /flink-dist
    2. 上传自己的 jar 包到 HDFS

      1
      2
      [lepeng@hadoop102 flink-1.17.0]$ hadoop fs -mkdir /flink-jars
      [lepeng@hadoop102 flink-1.17.0]$ hadoop fs -put FlinkTutorial-1.0-SNAPSHOT.jar /flink-jars
    3. 提交作业

      1
      [lepeng@hadoop102 flink-1.17.0]$ bin/flink run-application -t yarn-application    -Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flink-dist"    -c com.lepeng.wc.SocketStreamWordCount  hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar

    这种方式下,Flink 本身的依赖和用户 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

4、K8S 运行模式(了解)

容器化部署是如今业界流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是 Kubernetes(k8s),而 Flink 也在最近的版本中支持了 k8s 部署模式。基本原理与 YARN 是类似的,具体配置可以参见官网说明,这里我们就不做过多讲解了。


Flink 集群部署模式
https://flepeng.github.io/044-Flink-11-安装和配置-Flink-集群部署模式/
作者
Lepeng
发布于
2021年3月8日
许可协议