14-Flink 重启策略

Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/

介绍

当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。重启的策略有 2 种方式配置。

  • 基于配置文件的全局配置 flink-conf.yaml。定义策略的配置 key 为: restart-strategy。
  • 基于代码的作业级别配置。通过调用 setRestartStrategyon 上的方法以编程方式设置重新启动策略 ExecutionEnvironment。请注意,这也适用于 StreamExecutionEnvironment。

重启(Restart Strategies )策略种类

Flink 支持不同的重启策略:

  • 固定延迟重启策略(Fixed Delay Restart Strategy)
  • 故障率重启策略(Failure Rate Restart Strategy)
  • 没有重启策略(No Restart Strategy)
  • 后背重启策略(Fallback Restart Strategy)

注意:

  • 如果未启用检查点,则使用“无重启”策略。
  • 如果激活了检查点但未配置重启策略,则使用“固定延迟策略”:restart-strategy.fixed-delay.attempts: Integer.MAX_VALUE

固定延迟重启策略(Fixed Delay Restart Strategy)

固定延迟重启策略尝试给定次数重新启动作业。如果超过最大尝试次数,则作业最终会失败。在两次连续重启尝试之间,重启策略等待一段固定的时间。

通过在 flink-conf.yaml 配置文件中设置以下配置参数,此策略默认启用。

1
restart-strategy: fixed-delay
配置参数 描述 默认值
restart-strategy.fixed-delay.attempts 指定最大重启次数,超过该次数后作业会被失败。 1,或者 Integer.MAX_VALUE 如果通过检查点激活
restart-strategy.fixed-delay.delay 指定重启之间的延迟时间。 akka.ask.timeout,如果通过检查点激活,则为10秒

例如:

1
2
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

编程方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
mport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;

public class FixedDelayRestartStrategyExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置重启策略为FixedDelayRestartStrategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最大重启次数
Time.seconds(10) // 重启延迟时间
));

// 执行作业的代码
env.execute("FixedDelayRestartStrategyExample");
}
}

故障率重启策略(Failure Rate Restart Strategy)

故障率重启策略在故障后重新启动作业,但是当 failure rate 超过(每个时间间隔的故障)时,作业最终会失败。在两次连续重启尝试之间,重启策略等待一段固定的时间。

通过在 flink-conf.yaml 中设置以下配置参数,此策略默认启用。

1
restart-strategy: failure-rate
配置参数 描述 默认值
restart-strategy.failure-rate.max-failures-per-interval 失败作业之前给定时间间隔内的最大重启次数 1
restart-strategy.failure-rate.failure-rate-interval 测量故障率的时间间隔。 1分钟
restart-strategy.failure-rate.delay 两次连续重启尝试之间的延迟 akka.ask.timeout
1
2
3
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

也可以通过编程方式设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;

public class FailureRateRestartStrategyExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置重启策略为FailureRateRestartStrategy
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 失败率阈值
Time.minutes(5), // 检测时间窗口
Time.seconds(10) // 延迟时间
));
// 执行作业的代码
env.execute("FailureRateRestartStrategyExample");
}
}

没有重启策略(No Restart Strategy)

作业直接失败,不尝试重启。这种策略适合于批处理作业或者不需要容错机制的流处理作业。

1
restart-strategy: none

也可以通过编程方式设置 no restart 策略:

1
2
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

后背重启策略(Fallback Restart Strategy)

后备重启策略,保底策略,即所谓的默认重启策略;集群中如果没有在配置文件(flink-conf.yaml)中显示的配置重启策略,也没有在编程中织入,在检查点机制开启的情况下,任务失败,flink会默认的选用Fixed Delay Restart Strategy重启,且会无限尝试重连(Integer.MAX_VALUE次)。


14-Flink 重启策略
https://flepeng.github.io/044-Flink-42-核心概念-14-Flink-重启策略/
作者
Lepeng
发布于
2021年3月8日
许可协议