Flink 集群搭建及作业提交初体验
Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/
1、集群角色
Flink 提供作业和执行任务,需要几个关键组件:
- Client(客户端):代码由客户端获取并做转换,之后提交给JobManager
- JobManager:就是 Flink 集群里的 “管事人“,对作业进行中央调度;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的 TaskManager
- TaskManager:就是真正”干活的人“,数据的处理操作都是它们来做的。
注意:Flink 是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成。所以接下来我们会先做一个简单的介绍,让大家有一个初步的认识,之后再展开讲述不同情形下的 Flink 部署。
2、Flink集群搭建
2.1、集群启动
集群规划
节点服务器 hadoop102 Hadoop103 Hadoop104 角色 JobManager
TaskManagerTaskManager TaskManager 下载并解压安装包
下载安装包 flink-1.17.0-bin-scala_2.12.tgz,将该 jar 包上传到 hadoop102节点服务器的
/opt/software
路径上。在
/opt/software
路径上解压 flink-1.17.0-bin-scala_2.12.tgz 到/opt/module
路径上。1
[lepeng@hadoop102 software]$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
修改集群配置
进入 conf 路径,修改
flink-conf.yaml
文件,指定 hadoop102 节点服务器为 JobManager1
[lepeng@hadoop102 conf]$ vim flink-conf.yaml
修改如下内容:
1
2
3
4
5
6
7
8# JobManager节点地址.
jobmanager.rpc.address: hadoop102
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop102
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop102修改 workers 文件,指定 hadoop102、hadoop103 和 hadoop104 为 TaskManager
1
[lepeng@hadoop102 conf]$ vim workers
修改如下内容:
1
2
3hadoop102
hadoop103
hadoop104修改 masters 文件
1
[lepeng@hadoop102 conf]$ vim masters
修改如下内容:
1
hadoop102:8081
另外,在
flink-conf.yaml
文件中还可以对集群中的 JobManager 和 TaskManager 组件进行优化配置,主要配置项如下:jobmanager.memory.process.size
:对 JobManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。taskmanager.memory.process.size
:对 TaskManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。taskmanager.numberOfTaskSlots
:对每个 TaskManager 能够分配的 Slot 数量进行配置,默认为1,可根据TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。所谓 Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。parallelism.default
:Flink 任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
分发安装目录
配置修改完毕后,将 Flink 安装目录发给另外两个节点服务器。
1
[lepeng@hadoop102 module]$ xsync flink-1.17.0/
修改 hadoop103 的
taskmanager.host
1
[lepeng@hadoop103 conf]$ vim flink-conf.yaml
修改如下内容:
1
2# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: hadoop103修改 hadoop104 的 taskmanager.host
1
[lepeng@hadoop104 conf]$ vim flink-conf.yaml
修改如下内容:
1
2# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: hadoop104
启动集群
在 hadoop102 节点服务器上执行
start-cluster.sh
启动 Flink 集群:1
[lepeng@hadoop102 flink-1.17.0]$ bin/start-cluster.sh
查看进程情况:
1
2
3
4
5
6
7
8
9
10
11[lepeng@hadoop102 flink-1.17.0]$ jpsall
=============== hadoop102 ===============
4453 StandaloneSessionClusterEntrypoint
4458 TaskManagerRunner
4533 Jps
=============== hadoop103 ===============
2872 TaskManagerRunner
2941 Jps
=============== hadoop104 ===============
2948 Jps
2876 TaskManagerRunner
访问 WebUI
启动成功后,可以访问 http://hadoop102:8081 对 Flink 集群和任务进行监控管理。
这里可以明显看到,当前集群的 TaskManager 数量为 3;由于默认每个 TaskManager 的 Slot 数量为 1,所以总 Slot 数和可用 Slot 数都为 3。
2.2、向集群提交作业
环境准备
在 hadoop102 中执行以下命令启动netcat。
1
[lepeng@hadoop102 flink-1.17.0]$ nc -lk 7777
程序打包
在我们编写的 Flink 入门程序的 pom.xml 文件中添加打包插件的配置,具体如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>插件配置完毕后,可以使用 IDEA 的 Maven 工具执行 package 命令,出现如下提示即表示打包成功。
1
2
3-------------------------------------------------------------------
[INFO] BUILD SUCCESS
-------------------------------------------------------------------打包完成后,在 target 目录下即可找到所需 JAR 包,JAR 包会有两个,
FlinkTutorial-1.0-SNAPSHOT.jar
和FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar
,因为集群中已经具备任务运行所需的所有依赖,所以**建议使用FlinkTutorial-1.0-SNAPSHOT.jar
**。
在 WebUI 上提交作业
任务打包完成后,我们打开 Flink 的 WEB UI 页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的 JAR 包,如下图所示。
点击该 JAR 包,出现任务配置页面,进行相应配置。
主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如下图所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行。
任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况。
测试
在 socket 端口中输入 hello
1
2[lepeng@hadoop102 flink-1.17.0]$ nc -lk 7777
hello先点击 Task Manager,然后点击右侧的 192.168.10.104 服务器节点
点击 Stdout,就可以看到 hello 单词的统计
注意:如果 hadoop104 节点没有统计单词数据,可以去其他 TaskManager 节点查看。
点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”结束任务运行。
命令行提交作业
除了通过 WEB UI 界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把 jar 包直接上传到目录 flink-1.17.0 下
首先需要启动集群。
1
[lepeng@hadoop102 flink-1.17.0]$ bin/start-cluster.sh
在 hadoop102 中执行以下命令启动 netcat。
1
[lepeng@hadoop102 flink-1.17.0]$ nc -lk 7777
将 Flink 程序运行 jar 包上传到
/opt/module/flink-1.17.0
路径。进入到 Flink 的安装路径下,在命令行使用 flink run 命令提交作业。
1
[lepeng@hadoop102 flink-1.17.0]$ bin/flink run -m hadoop102:8081 -c com.lepeng.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
这里的参数
-m
指定了提交到的 JobManager,-c
指定了入口类。在浏览器中打开 Web UI,http://hadoop102:8081 查看应用执行情况。
用 netcat 输入数据,可以在 TaskManager 的标准输出(Stdout)看到对应的统计结果。
在
/opt/module/flink-1.17.0/log
路径中,可以查看 TaskManager 节点。1
2
3
4
5
6[lepeng@hadoop102 log]$ cat flink-lepeng-standalonesession-0-hadoop102.out
(hello,1)
(hello,2)
(flink,1)
(hello,3)
(scala,1)