10-Flink 状态管理-状态后端

Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/

1、状态后端(State Backends)

在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫状态后端(state backend)。状态后端主要负责管理本地状态的储存方式和位置。

2、状态后端的分类(HashMapStateBackend/RocksDB)

状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink 中提供了不同的状态后端

1.13 版本之前有三种

  • MemoryStateBackend 开发时使用
  • FsStateBackend 生产时使用,常用
  • RocksDBStateBackend 生产时使用,非常大的状态时用

1.13 版本之后合并为两种

  • HashMapStateBackend 哈希表状态后端,相当于 MemoryStateBackend 和 FsStateBackend,默认值
  • EmbeddedRocksDBStateBackend 内嵌 RocksDB 状态后端,生产时使用,非常大的状态时用

3、1.13 版之前

https://nightlies.apache.org/flink/flink-docs-release-1.8/ops/state/state_backends.html

Flink 自带了以下几种开箱即用的 StateBackend:

  • MemoryStateBackend:默认值,小状态,本地调试使用。
  • FsStateBackend:大状态,长窗口,高可用场景。
  • RocksDBStateBackend:超大状态,长窗口,高可用场景,可增量 checkpoint。

3.1、MemoryStateBackend

MemoryStateBackend 是将状态维护在 Java 堆上的一个内部状态后端。键值状态和窗口算子使用哈希表来存储数据(values)和定时器(timers)。

  • MemoryStateBackend 将 state 保存在 TaskManager 的 Java 堆上。
  • checkpoint 时,会对当前的状态进行快照,并且将其作为 checkpoint ACK 消息的一部分发送给 JobManager(master)。即把 state 的快照数据保存到 JobManager 的内存中。

默认情况下,MemoryStateBackend 配置成支持异步快照。异步快照可以避免阻塞数据流的处理,从而避免反压的发生。当然,使用 new MemoryStateBackend(MAX_MEM_STATE_SIZE, false) 也可以禁用该特点。

MemoryStateBackend 的四个构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
public MemoryStateBackend() {
this(5242880);
}
public MemoryStateBackend(int maxStateSize) {
this(maxStateSize, true);
}
public MemoryStateBackend(boolean asynchronousSnapshots) {
this(5242880, asynchronousSnapshots);
}
public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) {
this.maxStateSize = maxStateSize;
this.asynchronousSnapshots = asynchronousSnapshots;
}

MemoryStateBackend 的局限性

  • 单个状态的大小默认情况下最大为 5MB。这个值可以通过 MemoryStateBackend 构造函数进行增加。状态大小受到 akka 帧大小的限制(maxStateSize <= akka.framesize 默认 10 M),所以无论怎么调整状态大小配置,都不能大于 akka 的帧大小。可以通过 akka.framesize 调整 akka 帧大小。
  • 聚合的状态必须在 JobManager 的内存中能存放。
  • 状态的总大小不能超过 JobManager 的内存。

MemoryStateBackend 适用于

  • 本地开发和调试。
  • 只有很小状态的作业,例如作业只由 record-at-a-time 函数组成(Map,FlatMap,Filter,…)。Kafka 消费者只需要非常小的状态。

3.2、FsStateBackend

FsStateBackend 基于文件系统进行存储,需要配置存储的文件系统,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。

  • HDFS 路径:hdfs://namenode:40010/flink/checkpoints
  • 文件系统 路径:file:///data/flink/checkpoints
  • s3 路径:s3://flink/checkpoints

虽然选择了 FsStateBackend,但正在使用的 state 数据仍是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将 state 的快照数据写入到指定文件系统(hdfs,s3)上,同时会在 JobManager 的内存中(在高可用场景下会存在 Zookeeper 中)存储极少的元数据。容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。

FsStateBackend 默认使用异步快照,以避免在状态 checkpoint 时阻塞数据流的处理。该特性可以实例化 FsStateBackend 时传入false的布尔标志来禁用掉,例如:new FsStateBackend(path, false)

FsStateBackend 三种构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
this(new Path(checkpointDataUri), asynchronousSnapshots);
}
public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
this(checkpointDataUri.toUri(), asynchronousSnapshots);
}
//fileStateSizeThreshold默认1024
public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold, boolean asynchronousSnapshots) throws IOException {
Preconditions.checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
Preconditions.checkArgument(fileStateSizeThreshold <= 1048576, "The threshold for file state size cannot be larger than %s", new Object[]{1048576});
this.fileStateThreshold = fileStateSizeThreshold;
this.basePath = validateAndNormalizeUri(checkpointDataUri);
this.asynchronousSnapshots = asynchronousSnapshots;
}

FsStateBackend 的推荐使用的场景

  • 处理大状态,长窗口,或大键值状态的有状态处理任务,例如分钟级窗口聚合或 join。
  • 适合用于高可用方案(需要开启HA的作业)
  • 可以在生产环境中使用。

3.3、RocksDBStateBackend

RocksDBStateBackend 的配置也需要一个文件系统(类型,地址,路径),比如:hdfs://namenode:40010/flink/checkpointss3://flink/checkpoints

RocksDB 是一种嵌入式的本地数据库。RocksDBStateBackend 会把 state 直接写入本地 RocksDB 中(默认存储在 TaskManager 的 data 目录下)。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。RocksDB 默认也是配置成异步快照的模式。 fail over 的时候从 filesystem 中恢复到本地。

RocksDB 克服了 state 受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用

RocksDBStateBackend 局限性

  • RocksDB 支持的单 key 和单 value 的大小最大为每个 2^31 字节。这是因为 RocksDB 的 JNI API 是基于 byte[] 的。
  • 对于使用具有合并操作的状态的应用程序,例如 ListState,随着时间可能会累积到超过 2^31 字节大小,这将会导致在接下来的查询中失败。

RocksDBStateBackend 推荐使用的场景

  • 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。
  • 非常适合用于高可用方案。
  • 最好是对状态读写性能要求不高的作业。

注意:

RocksDBStateBackend 是目前唯一提供增量 checkpoint 的状态后端。

与增量快照相反的是全量快照,全量快照很好理解,在 Checkpoint 执行时,Flink 作业将当前所有的状态数据全部备份到远程文件系统中,这就是全量快照。而在生产环境中,大多数 Flink 作业两次快照的间隔中发生变化的状态数据只占整体状态数据的一小部分,基于这个特点,增量快照诞生了,增量快照的特点在于每一次快照要持久化的数据只包含自上一次快照完成之后发生变化(被修改)的状态数据,所以可以显著减少持久化快照文件的大小以及执行快照的耗时。

RocksDB 介绍

RocksDB 是一个 key/value 的内存存储系统,和其他的 key/value 一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中,但需要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。RocksDB 默认也是配置成异步快照的模式。不过 RocksDB 支持增量的 Checkpoint,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。

3.4、StateBackend 对比

3.4.1、不同 StateBackend 吞吐量对比

  1. FileSystem 和 Memory 的吞吐差异不大(都是使用堆内存管理处理中的数据),使用 RocksDB 的吞吐差距明显。

  2. Standalone 和 on Yarn 的总体差异不大,使用 FileSystem 和 Memory 时 on Yarn 模式下吞吐稍高,相反的使用 RocksDB 时 Standalone 模式下的吞吐稍高。

3.4.2、不同 StateBackend 延迟对比

  1. FileSystem 和 Memory 时延迟基本一致且较低。

  2. RocksDB 时延迟稍高,且由于吞吐较低,在达到吞吐瓶颈附近延迟陡增。其中 on Yarn 模式下吞吐更低,延迟变化更加明显。

3.4.3、StateBackend 的选择

StateBackend in-flight checkpoint 吞吐 推荐使用场景
MemoryStateBackend TM Memory JM Memory 调试、无状态或对数据丢失或重复无要求
FsStateBackend TM Memory FS/HDFS 普通状态、窗口、KV 结构
RocksDBStateBackend RocksDB on TM FS/HDFS 超大状态、超长窗口、大型 KV 结构

3.5、StateBackend 设置方式

  1. 在代码中配置,生效范围为当前 job

    1
    2
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
  2. 在配置文件 flink-conf.yaml 中设置,生效范围,部署在这个集群中的所有job。 state.backend 值:

    • jobmanager (MemoryStateBackend)
    • filesystem (FsStateBackend)
    • rocksdb (RocksDBStateBackend)

    或者实现了状态后端工厂 FsStateBackendFactory 的类的完全限定类名,例如,为 RocksDBStateBackend 设置为 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

    配置文件示例

    1
    2
    3
    4
    5
    6
    7
    8
    # The backend that will be used to store operator state checkpoints
    state.backend: filesystem
    # Directory for storing checkpoints
    state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
    # 检查点中保存的数据是否采用增量的方式
    state.backend.incremental: false
    # flink应用失败后的重启策略
    jobmanager.execution.failover-strategy: region

4、1.13 版之后

https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/state_backends/

从 Flink 1.13 版本开始,Flink 统一了不同状态后端的 Savepoint 的二进制格式,因此我们可以使用一种状态后端生成 Savepoint 并且使用另一种状态后端进行恢复,这可以帮助我们在极致的状态访问性能(HashMap 状态后端)以及支持大容量的状态存储(RocksDB 状态后端)之间进行灵活切换。

4.1、哈希表状态后端(HashMapStateBackend)

HashMapStateBackend 是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

4.2、内嵌 RocksDB 状态后端(EmbeddedRocksDBStateBackend)

RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置 EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB 默认存储在 TaskManager 的本地数据目录里。

RocksDB 的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行,而不是直接调用 .hashCode().equals() 方法。

EmbeddedRocksDBStateBackend 始终执行的是异步快照,所以不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。

4.3、选择状态后端

HashMap 和 RocksDB 两种状态后端最大的区别,在于本地状态存放在哪里。

HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。

4.4、状态后端的配置

在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件 flink-conf.yaml 中指定的,配置的键名称为 state.backend。配置项的值为

  • hashmap,这样配置的就是 HashMapStateBackend;
  • rocksdb,这样配置的就是 EmbeddedRocksDBStateBackend。

这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

例子

1
2
3
4
5
# 默认状态后端
state.backend: hashmap

# 存放检查点的文件路径
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints

为每个作业(Per-job/Application)单独配置状态后端。通过执行环境设置,HashMapStateBackend。

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

通过执行环境设置,EmbeddedRocksDBStateBackend。

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意,如果想在 IDE 中使用 EmbeddedRocksDBStateBackend,需要为 Flink 项目添加依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>

而由于 Flink 发行版中默认就包含了 RocksDB(服务器上解压的 Flink),所以只要我们的代码中没有使用 RocksDB 的相关内容,就不需要引入这个依赖。

5、状态后端的使用注意事项

区分键值状态和算子状态

由于算子状态数据只会存储在 SubTask 内存中,因此在生产环境中要严格区分键值状态和算子状态的使用场景,避免因为将算子状态当做键值状态使用而导致出现内存溢出的问题。如下图:

ValueState<HashMap<String, String>>MapState<String, String> 的选型

作为初学者来说,如果要在键值状态中存储 Map<String, String> 数据结构的状态,可能会认为使用 ValueState<HashMap<String, String>> 或者使用 MapState<String, String> 都是可行的。

如果我们选择使用 HashMap 状态后端,那么两种方式的性能上不会有很大差异,但是如果我们选择使用 RocksDB 状态后端,则推荐使用 MapState<String, String>,避免使用 ValueState<HashMap<String, String>>。因为 ValueState<HashMap<String, String>> 在将数据写入 RocksDB 时,是将一整个 HashMap<String, String> 序列化为字节数组之后写入的。同样,在读取时,也是先读取到字节数组,然后反序列化为一整个 HashMap<String, String> 后,再给用户使用。所以每次访问和更新 ValueState 时,实际上都是对 HashMap<String, String> 这个集合类的大对象做序列化以及反序列化,而这是一个及其耗费资源的过程,很容易就会导致 Flink 作业产生性能瓶颈,所以极不推荐在 ValueState 中存储大对象。


10-Flink 状态管理-状态后端
https://flepeng.github.io/044-Flink-42-核心概念-10-Flink-状态管理-状态后端/
作者
Lepeng
发布于
2021年3月8日
许可协议