01-Flink 基础

官网保平安:https://flink.apache.org/

Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink 提供了诸多高抽象层的 API 以便用户编写分布式任务。

  • DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用 Flink 提供的各种操作符对分布式数据集进行处理,支持 Java、Scala 和 Python。

  • DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持 Java 和 Scala。

  • Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类 SQL 的 DSL 对关系表进行各种查询操作,支持 Java 和 Scala。

此外,Flink 还针对特定的应用领域提供了领域库,例如:Flink ML,Flink 的机器学习库,提供了机器学习 Pipelines API 并实现了多种机器学习算法。Gelly,Flink 的图计算库,提供了图计算的相关 API 及多种图计算算法实现。

  1. Flink 是实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型。(根本区别,一定要说出来)

  2. 架构模型:Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor。Flink 在运行时主要包含:Jobmanager、Taskmanager 和 Slot。

  3. 时间机制:Spark Streaming 只支持处理时间。Flink 支持处理时间、事件时间、注入时间。同时也支持 Watermark 来处理滞后数据。

  4. 容错机制:

    • 对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。
    • Flink 则使用 两阶段提交协议 和 checkpoint 实现精准一次处理,容错性好。
    • Spark Streaming 的 checkpoint 仅仅是针对 driver 的故障恢复做了数据和元数据的 checkpoint。
    • Flink 的 checkpoint 机制要复杂了很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。

根据 Flink 官网描述,Flink 是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。

自下而上,每一层分别代表:

  • Deploy 层:该层主要涉及了 Flink 的部署模式,在上图中我们可以看出,Flink 支持包括 local、Standalone、Cluster、Cloud 等多种部署模式。
  • Runtime 层:Runtime 层提供了支持 Flink 计算的核心实现,比如:支持分布式 Stream 处理、JobGraph 到 ExecutionGraph 的映射、调度等等,为上层 API 层提供基础服务。
  • API 层:API 层主要实现了面向流(Stream)处理和批(Batch)处理 API,其中面向流处理对应 DataStream API,面向批处理对应 DataSet API,后续版本,Flink 有计划将 DataStream 和 DataSet API 进行统一。
  • Libraries 层:该层称为 Flink 应用框架层,根据 API 层的划分,在 API 层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于 SQL-like 的操作(基于 Table 的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。

上图是来自 Flink 官网的运行流程图。通过上图我们可以得知,Flink 程序的基本构建是:数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个 Sink 接收器中结束。数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。执行时,Flink 程序映射到 streaming dataflows,由流(streams)和转换操作(transformation operators)组成。

简单来说:

  • Source:数据输入。
  • Transformation:中间转换。
  • Sink:数据输出。

Flink 程序在运行时主要有 TaskManager、JobManager、Client 三种角色。

  • JobManager 扮演着集群中的管理者 Master 的角色,它是整个集群的协调者,负责接收 Flink Job,协调检查点,Failover 故障恢复等,同时管理 Flink 集群中从节点 TaskManager。

  • TaskManager 是实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task,每个 TaskManager 负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向 JobManager 汇报。

  • Client 是 Flink 程序提交的客户端,当用户提交一个 Flink 程序时,会首先创建一个 Client,该 Client 首先会对用户提交的 Flink 程序进行预处理,并提交到 Flink 集群中处理,所以 Client 需要从用户提交的 Flink 程序配置中获取 JobManager 的地址,并建立到 JobManager 的连接,将 Flink Job 提交给 JobManager。

简单来说:

  • TaskManager:实际执行计算,与 TaskManager 状态的传递。
  • JobManager:任务和资源管理,管理 TaskManager、协调 Checkpoints、故障恢复。
  • Client:与 JobManager 交互并把任务提交到集群。

在 Flink 架构角色中我们提到,TaskManager 是实际负责执行计算的 Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个 task 或多个 subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。

简单的说,TaskManager 会将自己节点上管理的资源分为不同的 Slot:固定大小的资源子集。这样就避免了不同 Job 的 Task 互相竞争内存资源,但是需要主要的是,Slot 只会做内存的隔离。没有做 CPU 的隔离。

Flink 中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。我们在实际生产环境中可以从四个不同层面设置并行度:

  1. 系统层面(Flink 客户端的配置 yml 文件中设置)。
  2. 客户端层面(提交 flink run -p 的时候设置)。
  3. 执行环境层面(构建 Flink 环境时 getExecutionEnvironment.setParallelism(2) 设置)。
  4. 算子层面(算子.setParallelism(3),实际算子时设置)。

优先级:算子设置>执行环境>客户端>系统

实际业务中通常设置和 kafka 分区数一样或者 Kafka 分区倍数的并行度。

  • Slot 是指 TaskManager 的并发执行能力,假设我们将 taskmanager.numberOfTaskSlots 设置为 3,那么每一个 TaskManager 中分配 3 个 TaskSlot, 3 个 TaskManager 一共有 9 个 TaskSlot。
  • parallelism 是指 TaskManager 实际使用的并发能力。假设我们把 parallelism.default 设置为 1,那么 9 个 TaskSlot 只能用 1 个,有 8 个空闲。

Flink的算子分为两大类:一类是 DataSet,一类是DataStream

DataSet

Source 算子

  1. fromCollection:从本地集合读取数据
  2. readTextFile:从文件中读取
  3. readTextFile:遍历目录。readTextFile 可以对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式
  4. readTextFile:读取压缩文件。对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

Transform 转换算子。因为 Transform 算子基于 Source 算子操作,所以首先构建 Flink 执行环境及 Source 算子,后续 Transform 算子操作基于此:

  1. map:将 DataSet 中的每一个元素转换为另外一个元素

  2. flatMap:将 DataSet 中的每一个元素转换为 0…n 个元素。

  3. filter:过滤出来一些符合条件的元素

  4. reduce:对一个 DataSet 或者一个 group 来进行聚合计算,最终聚合成一个元素

  5. reduceGroup:将一个 DataSet 或者一个 group 聚合成一个或多个元素。
    reduceGroup 是 reduce的 一种优化方案;它会先分组 reduce,然后在做整体的 reduce;这样做的好处就是可以减少网络 IO

  6. minBy 和 maxBy:选择具有最小值或最大值的元素

  7. Aggregate:在数据集上进行聚合求最值(最大值、最小值)

  8. distinct:去除重复的数据

  9. first:取前N个数

  10. join:将两个 DataSet 按照一定条件连接到一起,形成新的 DataSet

  11. union:联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重

  12. rebalance:Flink 也有数据倾斜的时候,rebalance 内部使用 round robin 方法将数据均匀打散。

  13. partitionByHash:按照指定的 key 进行 hash 分区

Sink 算子

  1. collect:将数据输出到本地集合
  2. writeAsText:将数据输出到文件

DataStream

Source 算子,和 DataSet 基本相同

Transform 转换算子

  1. map:将 DataSet 中的每一个元素转换为另外一个元素。
  2. FlatMap:采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的 flatmap 函数。
  3. Filter:计算每个数据元的布尔函数,并保存函数返回 true 的数据元。过滤掉零值的过滤器。
  4. KeyBy:逻辑上将流分区为不相交的分区。具有相同 Keys 的所有记录都分配给同一分区。在内部,keyBy 是使用散列分区实现的。指定键有不同的方法。此转换返回 KeyedStream,其中包括使用被 Keys 化状态所需的 KeyedStream。
  5. Reduce:被 Keys 化数据流上的“滚动”Reduce。将当前数据元与最后一个 Reduce 的值组合并发出新值。
    传入两个 Tuple2<String,Integer> 类型的元素,传出一个 Tuple2<String,Integer> 类型的元素。
  6. Aggregations:在被 Keys 化数据流上滚动聚合。min 和 minBy 之间的差异是 min 返回最小值,而 minBy 返回该字段中具有最小值的数据元(max 和 maxBy 相同)。
    1
    2
    3
    4
    5
    keyedStream.sum(0);
    keyedStream.min(0);
    keyedStream.max(0);
    keyedStream.minBy(0);
    keyedStream.maxBy(0);
  7. Window:可以在已经分区的 KeyedStream 上定义 Windows。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对每个 Keys 中的数据进行分组。dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
  8. WindowAll:Windows 可以在常规 DataStream 上定义。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对所有流事件进行分组。
    注意:在许多情况下,这是非并行转换。所有记录将收集在 windowAll 算子的一个任务中。
    dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  9. Window Apply 将一般函数应用于整个窗口。注意:如果您正在使用 windowAll 转换,则需要使用 AllWindowFunction。
  10. Window Reduce:将函数缩减函数应用于窗口并返回缩小的值
  11. Union:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元 dataStream.union(otherStream1, otherStream2, ...)
  12. Window Join:在给定 Keys 和公共窗口上连接两个数据流
  13. Interval Join:在给定的时间间隔内使用公共 Keys 关联两个被 Key 化的数据流的两个数据元 e1 和 e2,以便 e1.timestamp + lowerBound
  14. Connect:“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态
  15. Split:根据某些标准将流拆分为两个或更多个流
  16. Select:从拆分流中选择一个或多个流

Sink 算子,支持将数据输出到:

  • 本地文件(参考批处理)
  • 本地集合(参考批处理)
  • HDFS(参考批处理)

除此之外,还支持:

  • sink 到 Kafka
  • sink 到 MySQL
  • sink 到 Redis

分区策略是用来决定数据如何发送至下游。目前 Flink 支持了 8 种分区策略的实现:

  • GlobalPartitioner:数据会被分发到下游算子的第一个实例中进行处理。
  • ShufflePartitioner:数据会被随机分发到下游算子的每一个实例中进行处理。
  • RebalancePartitioner:数据会被循环发送到下游的每一个实例中进行处理。
  • RescalePartitioner:这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为2,编号为 A 和 B。下游并行度为 4,编号为 1,2,3,4。那么 A 则把数据循环发送给 1 和 2,B 则把数据循环发送给 3 和 4。假设上游并行度为 4,编号为 A,B,C,D。下游并行度为 2,编号为 1,2。那么 A 和 B 则把数据发送给 1,C 和 D 则把数据发送给 2。
  • BroadcastPartitioner:广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做 Jion 的场景。
  • ForwardPartitioner:用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner 用来做数据的控制台打印。
  • KeyGroupStreamPartitioner Hash:会将数据按 Key 的 Hash 值输出到下游算子实例中。
  • CustomPartitionerWrapper:用户自定义分区器。需要用户自己实现 Partitioner 接口,来定义自己的分区逻辑。

Flink 支持两种划分窗口的方式,按照 time 和 count。如果根据时间划分窗口,那么它就是一个 time-window;如果根据数据划分窗口,那么它就是一个 count-window。Flink 支持窗口的两个重要属性(size和interval)

  • 如果 size=interval,那么就会形成 tumbling-window(无重叠数据)
  • 如果 size>interval,那么就会形成 sliding-window (有重叠数据)
  • 如果 size<interval,那么这种窗口将会丢失数据。比如每 5 秒钟,统计过去 3 秒的通过路口汽车的数据,将会漏掉 2 秒钟的数据。

通过组合可以得出四种基本窗口:

  • time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))
  • time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))
  • count-tumbling-window 无重叠数据的数量窗口,设置方式举例:countWindow(5)
  • count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)
  • 固定延迟重启策略。表示在作业失败后会进行固定延迟的重启,直到达到最大重启次数或作业成功执行。该重启策略适用于故障是暂时性的,可以通过重试来解决的情况。
  • 故障率重启策略。根据作业的最近一段时间的故障率来判断是否进行重启。当故障率超过设定的阈值时,作业会进行重启。
  • 没有重启策略。作业遇到故障后直接失败并停止。这种策略适合于批处理作业或者不需要容错机制的流处理作业。
  • Fallback 重启策略(默认是这个 Fallback 重启策略)。它根据指定的重启次数和重启延迟时间,进行固定次数的重启,如果重启次数用完后仍然失败,则作业失败。
  • 事件时间(event time):数据本身真正产生时间,(生产环境中用这个)。
  • 摄取时间(ingestion time):Flink 读取数据时的时间。适用场景:存在多个 Source Operator 的情况下,每个 Source Operator 可以使用自己本地系统时钟指派 Ingestion Time。后续基于时间相关的各种操作,都会使用数据记录中的 Ingestion Time
  • 处理时间(processing time):Flink 中算子处理数据的时间。适用场景:没有事件时间的情况下,或者对实时性要求超高的情况。

本道面试题考察的其实就是一句话:Flink 的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink 使用一个引擎支持了 DataSet API 和 DataStream API。

在一个 Flink Job 中,数据需要在不同的 task 中进行交换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲 buffer 中收集 records,然后再发送。

Records 并不是一个一个被发送的,而是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。

Flink 源码中有一个独立的 connector 模块,所有的其他 connector 都依赖于此模块。

Flink 在 1.9 版本发布的全新 Kafka 连接器,摒弃了之前连接不同版本的 kafka 集群需要依赖不同版本的 connector 这种做法,只需要依赖一个 connector 即可。

Flink 可以不依赖 Hadoop 组件执行,例如可以跑单机版(但是实际生产环境使用通常是提交 Flink 到 yarn 上运行,生产是需要 Hadoop 组件的)

但是做为大数据的基础设施,Hadoop 体系是任何大数据框架都绕不过去的。Flink 可以集成众多 Hadooop 组件,例如 Yarn、Hbase、HDFS 等等。

  • Flink 可以和 Yarn 集成做资源调度,也可以读写 HDFS。
  • Flink 利用 HDFS 做检查点。

Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。

  • 托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;
  • 原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

托管状态分为算子状态(Operator State)和按键分区状态(Keyed State)。我们知道在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink 能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。

  • 算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。
  • 按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以 key 为作用范围进行隔离。需要注意,使用 Keyed State 必须基于 KeyedStream。没有进行 keyBy 分区的 DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问 Keyed State。

实际生产中通常使用 Keyed State 中的 ValueState、MapState、ListState。

存在状态后端

1.13 版本之前

  • MemoryStateBackend 开发时使用
  • FsStateBackend 生产时使用,常用
  • RocksDBStateBackend 生产时使用,非常大的状态时用

1.13 版本之后

  • HashMapStateBackend 相当于 MemoryStateBackend 和 FsStateBackend,根据 API 不同
  • EmbeddedRocksDBStateBackend 生产时使用,非常大的状态时用

公司怎么提交的实时任务,有多少 Job Manager?

Flink 提交有几种模式

  • yarn-session 模式。
  • Application 模式。
  • Per-Job 模式。
  1. 我们使用 yarn session 模式提交任务。每次提交都会创建一个新的 Flink 集群,为每一个 job 提供一个 yarn-session,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。线上命令脚本如下:

    bin/yarn-session.sh -n 7 -s 8 -jm 3072 -tm 32768 -qu root.*.* -nm *-* -d 其中申请 7 个 taskManager,每个 8 核,每个 taskmanager 有 32768M 内存。

  2. 集群默认只有一个 Job Manager。但为了防止单点故障,我们配置了高可用。我们公司一般配置一个主 Job Manager,两个备用 Job Manager,然后结合 ZooKeeper 的使用,来达到高可用。

checkpoint

作业未能成功进行 Checkpoint 的原因可能有以下几种:

  • 数据倾斜:如果作业中的某些任务处理的数据量过大,可能会导致这些任务阻塞,从而影响整个作业的进度。这种情况下,你可能需要调整任务的分配策略,以减少数据倾斜的影响。

  • 反压:反压是指任务在处理数据时,由于某些原因(如内存不足、磁盘IO瓶颈等)而无法及时处理数据,从而导致任务阻塞。这种情况下,你可能需要优化任务的执行环境,以提高任务的执行效率。

  • Checkpoint 配置问题:Checkpoint的配置参数(如 Checkpoint 间隔、Checkpoint 保留时间等)可能不合适,导致 Checkpoint 无法按时完成。这种情况下,你可能需要调整 Checkpoint 的配置参数。

  • 系统资源限制:如果系统的资源(如内存、磁盘空间等)不足以支持作业的Checkpoint,也可能导致 Checkpoint 失败。这种情况下,你可能需要增加系统的资源。

  • 网络延迟或其他 I/O 问题。

Reference


01-Flink 基础
https://flepeng.github.io/interview-44-数据处理-44-Flink-01-Flink-基础/
作者
Lepeng
发布于
2020年8月8日
许可协议