02-Flink 并行度(Parallelism)
Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/
并行度(Parallelism)
并行子任务和并行度
当要处理的数据量非常大时,我们可以把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。
在 Flink 执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
例如:如上图所示,当前数据流中有 source、map、window、sink 四个算子,其中 sink 算子的并行度为 1,其他算子的并行度都为 2。所以这段流处理程序的并行度就是 2。
并行度的设置
在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。
代码中设置
我们在代码中,可以很简单地在算子后跟着调用 setParallelism()
方法,来设置当前算子的并行度:
1 |
|
这种方式设置的并行度,只针对当前算子有效。
另外,我们也可以直接调用执行环境的 setParallelism()
方法,全局设定并行度:
1 |
|
这样代码中所有算子,默认的并行度就都为 2 了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
这里要注意的是,由于 keyBy 不是算子,所以无法对 keyBy 设置并行度。
提交应用时设置
在使用 flink run 命令提交应用时,可以增加 -p
参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:
1 |
|
如果我们直接在 Web UI 上提交作业,也可以在对应输入框中直接添加并行度。
配置文件中设置
我们还可以直接在集群的配置文件 flink-conf.yaml
中直接更改默认并行度:
1 |
|
这个设置对于整个集群上提交的所有作业有效,初始值为 1。无论在代码中设置、还是提交时的 -p
参数,都不是必须的;所以在没有指定并行度的时候,就会采用配置文件中的集群默认并行度。在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数。
并行度优先级
- 对于一个算子,首先看在代码中是否单独指定了它的并行度,这个特定的设置优先级最高,会覆盖后面所有的设置。
- 如果没有单独设置,那么采用当前代码中执行环境全局设置的并行度。
- 如果代码中完全没有设置,那么采用提交时
-p
参数指定的并行度。 - 如果提交时也未指定
-p
参数,那么采用集群配置文件中的默认并行度