01-Flink 基础

官网保平安:https://flink.apache.org/

Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink 提供了诸多高抽象层的 API 以便用户编写分布式任务。

  • DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用 Flink 提供的各种操作符对分布式数据集进行处理,支持 Java、Scala 和 Python。

  • DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持 Java 和 Scala。

  • Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类 SQL 的 DSL 对关系表进行各种查询操作,支持 Java 和 Scala。

此外,Flink 还针对特定的应用领域提供了领域库,例如:Flink ML,Flink 的机器学习库,提供了机器学习 Pipelines API 并实现了多种机器学习算法。Gelly,Flink 的图计算库,提供了图计算的相关 API 及多种图计算算法实现。

  1. Flink 是实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型。(根本区别,一定要说出来)

  2. 架构模型:Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor。Flink 在运行时主要包含:Jobmanager、Taskmanager 和 Slot。

  3. 时间机制:Spark Streaming 只支持处理时间。Flink 支持处理时间、事件时间、注入时间。同时也支持 Watermark 来处理滞后数据。

  4. 容错机制:

    • 对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。
    • Flink 则使用 两阶段提交协议 和 checkpoint 实现精准一次处理,容错性好。
    • Spark Streaming 的 checkpoint 仅仅是针对 driver 的故障恢复做了数据和元数据的 checkpoint。
    • Flink 的 checkpoint 机制要复杂了很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。

根据 Flink 官网描述,Flink 是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。

自下而上,每一层分别代表:

  • Deploy 层:该层主要涉及了 Flink 的部署模式,在上图中我们可以看出,Flink 支持包括 local、Standalone、Cluster、Cloud 等多种部署模式。
  • Runtime 层:Runtime 层提供了支持 Flink 计算的核心实现,比如:支持分布式 Stream 处理、JobGraph 到 ExecutionGraph 的映射、调度等等,为上层 API 层提供基础服务。
  • API 层:API 层主要实现了面向流(Stream)处理和批(Batch)处理 API,其中面向流处理对应 DataStream API,面向批处理对应 DataSet API,后续版本,Flink 有计划将 DataStream 和 DataSet API 进行统一。
  • Libraries 层:该层称为 Flink 应用框架层,根据 API 层的划分,在 API 层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于 SQL-like 的操作(基于 Table 的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。

上图是来自 Flink 官网的运行流程图。通过上图我们可以得知,Flink 程序的基本构建是:数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个 Sink 接收器中结束。数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。执行时,Flink 程序映射到 streaming dataflows,由流(streams)和转换操作(transformation operators)组成。

简单来说:

  • Source:数据输入。
  • Transformation:中间转换。
  • Sink:数据输出。

Flink 程序在运行时主要有 TaskManager、JobManager、Client 三种角色。

  • JobManager 扮演着集群中的管理者 Master 的角色,它是整个集群的协调者,负责接收 Flink Job,协调检查点,Failover 故障恢复等,同时管理 Flink 集群中从节点 TaskManager。

  • TaskManager 是实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task,每个 TaskManager 负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向 JobManager 汇报。

  • Client 是 Flink 程序提交的客户端,当用户提交一个 Flink 程序时,会首先创建一个 Client,该 Client 首先会对用户提交的 Flink 程序进行预处理,并提交到 Flink 集群中处理,所以 Client 需要从用户提交的 Flink 程序配置中获取 JobManager 的地址,并建立到 JobManager 的连接,将 Flink Job 提交给 JobManager。

简单来说:

  • TaskManager:实际执行计算,与 TaskManager 状态的传递。
  • JobManager:任务和资源管理,管理 TaskManager、协调 Checkpoints、故障恢复。
  • Client:与 JobManager 交互并把任务提交到集群。

在 Flink 架构角色中我们提到,TaskManager 是实际负责执行计算的 Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个 task 或多个 subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。

简单的说,TaskManager 会将自己节点上管理的资源分为不同的 Slot:固定大小的资源子集。这样就避免了不同 Job 的 Task 互相竞争内存资源,但是需要主要的是,Slot 只会做内存的隔离。没有做 CPU 的隔离。

Flink 中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。我们在实际生产环境中可以从四个不同层面设置并行度:

  1. 系统层面(Flink 客户端的配置 yml 文件中设置)。
  2. 客户端层面(提交 flink run -p 的时候设置)。
  3. 执行环境层面(构建 Flink 环境时 getExecutionEnvironment.setParallelism(2) 设置)。
  4. 算子层面(算子.setParallelism(3),实际算子时设置)。

优先级:算子设置>执行环境>客户端>系统

实际业务中通常设置和 kafka 分区数一样或者 Kafka 分区倍数的并行度。

  • Slot 是指 TaskManager 的并发执行能力,假设我们将 taskmanager.numberOfTaskSlots 设置为 3,那么每一个 TaskManager 中分配 3 个 TaskSlot, 3 个 TaskManager 一共有 9 个 TaskSlot。
  • parallelism 是指 TaskManager 实际使用的并发能力。假设我们把 parallelism.default 设置为 1,那么 9 个 TaskSlot 只能用 1 个,有 8 个空闲。

Flink的算子分为两大类:一类是 DataSet,一类是 DataStream

DataSet

Source 算子

  1. fromCollection:从本地集合读取数据
  2. readTextFile:从文件中读取
  3. readTextFile:遍历目录。readTextFile 可以对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式
  4. readTextFile:读取压缩文件。对于以下压缩类型,不需要指定任何额外的inputformat方法,Flink 可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

Transform 转换算子。因为 Transform 算子基于 Source 算子操作,所以首先构建 Flink 执行环境及 Source 算子,后续 Transform 算子操作基于此:

  1. map:将 DataSet 中的每一个元素转换为另外一个元素

  2. flatMap:将 DataSet 中的每一个元素转换为 0…n 个元素。

  3. filter:过滤出来一些符合条件的元素

  4. reduce:对一个 DataSet 或者一个 group 来进行聚合计算,最终聚合成一个元素

  5. reduceGroup:将一个 DataSet 或者一个 group 聚合成一个或多个元素。
    reduceGroup 是 reduce的 一种优化方案;它会先分组 reduce,然后在做整体的 reduce;这样做的好处就是可以减少网络 IO

  6. minBy 和 maxBy:选择具有最小值或最大值的元素

  7. Aggregate:在数据集上进行聚合求最值(最大值、最小值)

  8. distinct:去除重复的数据

  9. first:取前 N 个数

  10. join:将两个 DataSet 按照一定条件连接到一起,形成新的 DataSet

  11. union:联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重

  12. rebalance:Flink 也有数据倾斜的时候,rebalance 内部使用 round robin 方法将数据均匀打散

  13. partitionByHash:按照指定的 key 进行 hash 分区

Sink 算子

  1. collect:将数据输出到本地集合
  2. writeAsText:将数据输出到文件

DataStream

Source 算子,和 DataSet 基本相同

Transform 转换算子

  1. map:将 DataSet 中的每一个元素转换为另外一个元素。
  2. FlatMap:采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的 flatmap 函数。
  3. Filter:计算每个数据元的布尔函数,并保存函数返回 true 的数据元。过滤掉零值的过滤器。
  4. KeyBy:逻辑上将流分区为不相交的分区。具有相同 Keys 的所有记录都分配给同一分区。在内部,keyBy 是使用散列分区实现的。指定键有不同的方法。此转换返回 KeyedStream,其中包括使用被 Keys 化状态所需的 KeyedStream。
  5. Reduce:被 Keys 化数据流上的“滚动”Reduce。将当前数据元与最后一个 Reduce 的值组合并发出新值。
    传入两个 Tuple2<String,Integer> 类型的元素,传出一个 Tuple2<String,Integer> 类型的元素。
  6. Aggregations:在被 Keys 化数据流上滚动聚合。min 和 minBy 之间的差异是 min 返回最小值,而 minBy 返回该字段中具有最小值的数据元(max 和 maxBy 相同)。
    1
    2
    3
    4
    5
    keyedStream.sum(0);
    keyedStream.min(0);
    keyedStream.max(0);
    keyedStream.minBy(0);
    keyedStream.maxBy(0);
  7. Window:可以在已经分区的 KeyedStream 上定义 Windows。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对每个 Keys 中的数据进行分组。dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
  8. WindowAll:Windows 可以在常规 DataStream 上定义。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对所有流事件进行分组。
    注意:在许多情况下,这是非并行转换。所有记录将收集在 windowAll 算子的一个任务中。
    dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  9. Window Apply 将一般函数应用于整个窗口。注意:如果您正在使用 windowAll 转换,则需要使用 AllWindowFunction。
  10. Window Reduce:将函数缩减函数应用于窗口并返回缩小的值
  11. Union:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元 dataStream.union(otherStream1, otherStream2, ...)
  12. Window Join:在给定 Keys 和公共窗口上连接两个数据流
  13. Interval Join:在给定的时间间隔内使用公共 Keys 关联两个被 Key 化的数据流的两个数据元 e1 和 e2,以便 e1.timestamp + lowerBound
  14. Connect:“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态
  15. Split:根据某些标准将流拆分为两个或更多个流
  16. Select:从拆分流中选择一个或多个流

Sink 算子,支持将数据输出到:

  • 本地文件(参考批处理)
  • 本地集合(参考批处理)
  • HDFS(参考批处理)

除此之外,还支持:

  • sink 到 Kafka
  • sink 到 MySQL
  • sink 到 Redis

分区策略是用来决定数据如何发送至下游。目前 Flink 支持了 8 种分区策略的实现:

  • GlobalPartitioner:数据会被分发到下游算子的第一个实例中进行处理。
  • ShufflePartitioner:数据会被随机分发到下游算子的每一个实例中进行处理。
  • RebalancePartitioner:数据会被循环发送到下游的每一个实例中进行处理。
  • RescalePartitioner:这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为2,编号为 A 和 B。下游并行度为 4,编号为 1,2,3,4。那么 A 则把数据循环发送给 1 和 2,B 则把数据循环发送给 3 和 4。假设上游并行度为 4,编号为 A,B,C,D。下游并行度为 2,编号为 1,2。那么 A 和 B 则把数据发送给 1,C 和 D 则把数据发送给 2。
  • BroadcastPartitioner:广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做 Jion 的场景。
  • ForwardPartitioner:用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner 用来做数据的控制台打印。
  • KeyGroupStreamPartitioner Hash:会将数据按 Key 的 Hash 值输出到下游算子实例中。
  • CustomPartitionerWrapper:用户自定义分区器。需要用户自己实现 Partitioner 接口,来定义自己的分区逻辑。

Flink 支持两种划分窗口的方式,按照 time 和 count。如果根据时间划分窗口,那么它就是一个 time-window;如果根据数据划分窗口,那么它就是一个 count-window。Flink 支持窗口的两个重要属性(size和interval)

  • 如果 size=interval,那么就会形成 tumbling-window(无重叠数据)
  • 如果 size>interval,那么就会形成 sliding-window (有重叠数据)
  • 如果 size<interval,那么这种窗口将会丢失数据。比如每 5 秒钟,统计过去 3 秒的通过路口汽车的数据,将会漏掉 2 秒钟的数据。

通过组合可以得出四种基本窗口:

  • time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))
  • time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))
  • count-tumbling-window 无重叠数据的数量窗口,设置方式举例:countWindow(5)
  • count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)
  • 固定延迟重启策略。表示在作业失败后会进行固定延迟的重启,直到达到最大重启次数或作业成功执行。该重启策略适用于故障是暂时性的,可以通过重试来解决的情况。
  • 故障率重启策略。根据作业的最近一段时间的故障率来判断是否进行重启。当故障率超过设定的阈值时,作业会进行重启。
  • 没有重启策略。作业遇到故障后直接失败并停止。这种策略适合于批处理作业或者不需要容错机制的流处理作业。
  • Fallback 重启策略(默认是这个 Fallback 重启策略)。它根据指定的重启次数和重启延迟时间,进行固定次数的重启,如果重启次数用完后仍然失败,则作业失败。
  • 事件时间(event time):数据本身真正产生时间,(生产环境中用这个)。
  • 摄取时间(ingestion time):Flink 读取数据时的时间。
    • 适用场景:存在多个 Source Operator 的情况下,每个 Source Operator 可以使用自己本地系统时钟指派 Ingestion Time。后续基于时间相关的各种操作,都会使用数据记录中的 Ingestion Time
  • 处理时间(processing time):Flink 中算子处理数据的时间。
    • 适用场景:没有事件时间的情况下,或者对实时性要求超高的情况。

本道面试题考察的其实就是一句话:Flink 的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink 使用一个引擎支持了 DataSet API 和 DataStream API。

在一个 Flink Job 中,数据需要在不同的 task 中进行交换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲 buffer 中收集 records,然后再发送。

Records 并不是一个一个被发送的,而是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。

Flink 可以不依赖 Hadoop 组件执行,例如可以跑单机版(但是实际生产环境使用通常是提交 Flink 到 yarn 上运行,生产是需要 Hadoop 组件的)

但是做为大数据的基础设施,Hadoop 体系是任何大数据框架都绕不过去的。Flink 可以集成众多 Hadooop 组件,例如 Yarn、Hbase、HDFS 等等。

  • Flink 可以和 Yarn 集成做资源调度,也可以读写 HDFS。
  • Flink 利用 HDFS 做检查点。

Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。

  • 托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;
  • 原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

托管状态分为算子状态(Operator State)和按键分区状态(Keyed State)。我们知道在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink 能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。

  • 算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。
  • 按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以 key 为作用范围进行隔离。需要注意,使用 Keyed State 必须基于 KeyedStream。没有进行 keyBy 分区的 DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问 Keyed State。

实际生产中通常使用 Keyed State 中的 ValueState、MapState、ListState。

存在状态后端

1.13 版本之前有三种

  • MemoryStateBackend:开发时使用

    • 将 state 保存在 TaskManager 的 Java 堆上。
    • checkpoint 时,会对当前的状态进行快照,并且将其作为 checkpoint ACK 消息的一部分发送给 JobManager(master)。即把 state 的快照数据保存到 JobManager 的内存中。
    • 状态的总大小不能超过 JobManager 的内存
  • FsStateBackend:生产时使用,常用

    • state 数据仍是存储在 TaskManager 的内存中的
    • checkpoint 时,会将 state 的快照数据写入到指定文件系统(hdfs,s3)上,同时会在 JobManager 的内存中(在高可用场景下会存在 Zookeeper 中)存储极少的元数据。
    • 基于文件系统进行存储,需要配置存储的文件系统,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。
    • 容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。
  • RocksDBStateBackend:生产时使用,非常大的状态时用

    • state 直接写入本地 RocksDB 中(默认存储在 TaskManager 的 data 目录下)。
    • checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)
    • RocksDBStateBackend 是目前唯一提供增量 checkpoint 的状态后端。
    • 速度比较慢。

1.13 版本之后合并为两种

  • HashMapStateBackend:哈希表状态后端,相当于 MemoryStateBackend 和 FsStateBackend,默认值
    HashMapStateBackend 是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

  • EmbeddedRocksDBStateBackend 内嵌 RocksDB 状态后端,生产时使用,非常大的状态时用

公司怎么提交的实时任务,有多少 Job Manager?★★★★★

Flink 有三种部署模式:

  • 会话模式(Session Mode)。会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。

  • 单作业模式(Per-Job Mode)。会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。

  • 应用模式(Application Mode)。
    前面两种模式,应用代码都是客户端上执行,然后由客户端提交给 JobManager 的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给 JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗
    所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManager 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式

Flink 提交有几种模式

  • yarn-session 模式。
  • Application 模式。
  • Per-Job 模式。
  1. 我们使用 yarn session 模式提交任务。每次提交都会创建一个新的 Flink 集群,为每一个 job 提供一个 yarn-session,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。线上命令脚本如下:

    bin/yarn-session.sh -n 7 -s 8 -jm 3072 -tm 32768 -qu root.*.* -nm *-* -d 其中申请 7 个 taskManager,每个 8 核,每个 taskmanager 有 32768M 内存。

  2. 集群默认只有一个 Job Manager。但为了防止单点故障,我们配置了高可用。我们公司一般配置一个主 Job Manager,两个备用 Job Manager,然后结合 ZooKeeper 的使用,来达到高可用。

Reference


01-Flink 基础
https://flepeng.github.io/interview-44-数据处理-44-Flink-01-Flink-基础/
作者
Lepeng
发布于
2020年8月8日
许可协议