12-Flink 流控和反压

Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/
Flink 反压:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/ops/monitoring/back_pressure/

网络流控

为什么要进行流量控制

假设 Producer 和 Consumer 的 吞吐率分别为 2MB/s、1MB/s,生产者速度比消费者快 1MB/s。假设两端都有一个 Buffer,网络端的吞吐率是 2MB/s。过了 5s 后 Receive Buffer 可能就撑不住了,这时候会面临两种情况:

  • 如果 Receive Buffer 是有界的,这时候新到达的数据就只能被丢弃掉了。
  • 如果 Receive Buffer 是无界的,Receive Buffer 会持续的扩张,最终会导致 Consumer 的内存耗尽。

网络流量控制实现-静态限速

传统的做法是在发送端实现一个类似 Rate Limiter 这样的静态限流,发送端经过限流层之后以 1MB/s 的吞吐率发往接收端,这样就解决了 Producer 和 Consumer 处理速率不一致的情况;

方案存在的缺点:

  • 事先无法预估 Consumer 到底能承受多大的速率
  • Consumer 的承受能力通常会动态地波动

网络流量控制实现-动态反馈

动态反馈需要 Consumer 能够及时的给 Producer 进行反馈,告知 Producer 当前能够承受的速率是多少。动态反馈分为两种:

  • 负反馈:接收速率 < 发送速率时,告知 Producer 降低发送速率;
  • 正反馈:发送速率 > 接收速率时,告知 Producer 提高发送速率。

Flink 的反压分为两个阶段,一个是1.5版本之前,一个是1.5版本以后

  • Flink 1.5 之前是基于 TCP 流控 + bounded buffer 实现反压
  • Flink 1.5 之后实现了自己托管的 credit-based 流控机制,在应用层模拟 TCP 的流控机制

在1.5版本以前

在1.5版本以前,Flink 的反压是通过 TCP 的反压机制来控制的

我们来看下 Flink 的网络传输

发送端发送数据前会经历自己内部的三层缓冲,(Flink)Network Buffer -> (Netty)ChannelOutbound Buffer -> (Socket)Send Buffer;同样在接收端也会有三层 Buffer。

最终数据是经过 TCP 发送,TCP 有流量控制机制,实际上 Flink(before V1.5)就是通过 TCP 的流控机制来实现 feedback 的。

TCP 的反压,是通过 callback 实现的,当 socket 发送数据去 receive buffer 后,receiver 会反馈给 send 端,目前 receiver 端的 buffer 还有多少剩余空间,然后 send 会根据剩余空间,控制发送速率。

任务执行流程

  1. 编写代码

    根据用户编写的代码会生成最初的DAG图,用来表示程序的拓扑结构,即逻辑流图(streamGraph)

  2. 编译阶段,生成 作业图(JobGraph)

    客户端会将 StreamGraph 生成 JobGraph,主要是做一些算子链合并,JobGraph 就是做为向集群提交的最基本的单元。有了 JobGraph 后就会向集群进行提交,进入运行阶段。

  3. 运行阶段

    JobGraph 提交到集群后会生成 执行图(ExecutionGraph),ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。与 JobGraph 最大的区别就是按照并行度对并行子任务进行了拆分,分解为不同的 subTask,并明确了任务间数据传输的方式。

上图 ExecutionGraph 中的 Intermediate Result Partition 就是用于发送数据的模块,最终会将 ExecutionGraph 交给 JobManager 的调度器,将整个 ExecutionGraph 调度起来。

首先来看下 Flink 的 ExecutionGraph:

我们可以看到,上游 task 向下游 task 传输数据的时候,有 ResultPartition 和 InputGate 两个组件。

其中 ResultPartition 用来发送数据,InputGate 用来接收数据。

这个问题可以被看成两个问题,一个是跨 TaskManager,一个是 TaskManager 内部。譬如说下游的 sink 性能出了点问题,那么 inputChannel 就会告知上个 TaskManager 的 ResultPartition。然后第二个 TaskManager 中的,ResultPartition 也会通知该 TaskManager中 的 InputGate。

跨 TaskManager 的反压

先来看第一个问题,跨 TaskManager 的反压。

发送数据需要 ResultPartition,在每个 ResultPartition 里面会有分区 ResultSubPartition,中间还会有一些关于内存管理的 Buffer。每一个 TaskManager 会有一个统一的网络缓冲池(Network BufferPool),被所有Task共享,在初始化时会从堆外内存( Off-heap Memory )中申请内存,申请内存后就可以为每一个 ResultSubPartition 创建 本地缓冲池(Local BufferPool)

这个是跨 TaskManager 的网络传输的流程。理解过 Flink 内存管理的同学都会知道,Flink 会向 off-heap 内存申请一段固定的内存来使用,作为 NetWork BufferPool。然后 ResultPartition 和 InputGate 都会向 LocalBufferPool 申请内存资源,LocalBufferPool 会向 NetWorkBufferPool 申请资源。至于 Netty 直接走的 JVM。

当 sink 端数据处理不过来的时候,InputGate 会不断向 LocalBufferPool 申请内存,导致 LocalBufferPool 会不断向 NetWorkBufferPool 申请内存。最终导致 NetworkBufferPool 的可用内存被申请完。

当 InputGate 无法写入的时候,就不会去读 netty 里的数据。当 netty 的规定缓存被写满了以后,socket 就无法往 netty 里写数据。这个时候,socket 的缓存就很快会被用满,使用 TCP 的 ACK 机制,通知上游 Socket。然后上游的 Socket 由于无法往外写,所以上游的 Socket 也很快会被上游的 netty 写满。

所以此时,数据就会被不断缓存在上游的 netty,当上游的 netty 的 buffer 写到限制大小后,就会不能被写入。这个时候 netty 的 channel.isWritable() 就会返回 false。

上游的 ResultPartition 每次往 netty 写入数据的时候,都会通过 netty 的 channel.isWritable()。来判断 netty 是否能被写入。当 channel.isWritable() 返回 false 的时候,就会发生阻塞。所以当 ResultPartition 被写满的时候,就会去向 LocalBufferPool 请求内存,导致 LocalBufferPool 会向 NetworkBufferPool 请求内存,如果下游处理数据的速度一直跟不上产生数据的速度,那么会最终导致,上游的 NetworkBufferPool 的可用内存被申请完,产生堵塞。

TaskManager 内部的反压

以上是跨 TaskManager 的反压过程,下面是 TaskManager 内部的反压过程,也就是 ResultPartition 到 InputGate。其实和上面的类似。

当 ResultPartition 写入被堵塞的时候,由于 RecordWriter 和 RecordReader 是在同一个线程内,这个是 RecordReader 也被堵塞了。这个是就会重复上面的堵塞过程,导致 InputGate 也无法使用了。

这就是 1.5 版本以前的反压机制。

1.5 版本以后

TCP反压的弊端

先来说下 TCP 这种方式的弊端:

  1. 在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。。

  2. 依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。

Credit-based 反压过程

credit 的反压是类似于 TCP 的反压,但它是在 Flink 层面上的。可以看下图:

在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信)。如果这个时候发现 backlog > credit,那么 LocalBufferPool 就会向 NetWorkPool 申请内存。

长此以往,当 credit 返回 0 的时候,表示没有内存缓存了,那么 ResultSubPartition 接收到 credit 的时候,就不会继续往 netty 写数据了。这样 socket 就不会堵塞了,同时生效延迟也降低了。同时 ResultSubPartition 也会不断去探测 InputGate 是否有空余的空间。

这样在 ResultSubPartition 层就能感知到反压,不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题。

问题

有了动态反压,那么静态反压是不是就没有用了?

答案当然不是,这个很大程度上取决于 sink 端数据存储在哪里。譬如说,数据最终落地在 es 中,那么由于前面数据量过大,es 端直接报错,整个任务就停止了,这个时候动态反压就没有了,那么静态的限制 consumer 的量就很关键了,但是这个设置是在 1.8 版本里才有。

Flink 页面上监控反压的数据来源

在源码包中,有个 md 文件,对获取 back pressure 的 ratio 值做了解释。

默认情况下,每 50ms 发送 100 次探测,通过调用 Thrad.getStackTrace() 方法来探测,从网络堆栈请求缓存区。如果有的任务比较紧张,那么有的请求就会卡主,以此来监控任务是否存在反压。中期,0.01 表示 100 次中有 1 次探测卡主了。

Reference


12-Flink 流控和反压
https://flepeng.github.io/045-Flink-31-基础-12-Flink-流控和反压/
作者
Lepeng
发布于
2021年3月8日
许可协议