01-Kafka 基础

官网保平安:https://kafka.apache.org/

什么是 Kafka

Kafka 起初是由 Linkedin 公司采用 Scala 语言开发的一个 多分区、多副本 且基于 ZooKeeper 协调的分布式消息系统,现己被捐献给 Apache 基金会。

目前 Kafka 已经定位为一个 分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。

发布订阅的消息系统那么多,为啥选择 Kafka

  1. 多个生产者

    KafKa 可以无缝地支持多个生产者,不管客户端使用一个主题,还是多个主题。Kafka 适合从多个前端系统收集数据,并以统一的格式对外提供数据。

  2. 多个消费者

    Kafka 支持多个消费者从一个单独的消息流中读取数据,并且消费者之间互不影响。这与其他队列系统不同,其他队列系统一旦被客户端读取,其他客户端就不能再读取它。并且多个消费者可以组成一个消费者组,他们共享一个消息流,并保证消费者组对每个给定的消息只消费一次

  3. 基于磁盘的数据存储

    Kafka 允许消费者非实时地读取消息,原因在于 Kafka 将消息提交到磁盘上,设置了保留规则进行保存,无需担心消息丢失等问题。

  4. 伸缩性

    可扩展多台 Broker。用户可以先使用单个 Broker,到后面可以扩展到多个 Broker。

  5. 高性能(重要原因)

    Kafka 可以轻松处理百万千万级消息流,同时还能保证 亚秒级 的消息延迟。

为什么需要消息系统,MySQL 不能满足需求吗

  1. MySQL 所有的数据都要写入到磁盘,效率没有 Kafka 高。
  2. MySQL 没有生产者和消费者的概念,需要自己维护消费者和偏移量,并且 MySQL 没有消费者组的概念。
  3. MySQL 中所有的数据删除都需要自己做。

Kafka 缺点

  1. 由于是批量发送,数据并非真正的实时;
  2. 对于 mqtt 协议不支持;
  3. 不支持物联网传感数据直接接入;
  4. 仅支持统一分区内消息有序,无法实现全局消息有序;
  5. 监控不完善,需要安装插件;
  6. 依赖 zookeeper 进行元数据管理。

Kafka 的架构 ★★★

如下图所示:

Kafak 总体架构图中包含多个概念:

  1. ZooKeeper:Zookeeper 负责保存 Broker 集群元数据,并对控制器进行选举等操作。

  2. Producer:生产者。生产者负责创建消息,将消息发送到 Broker。

  3. Broker:一个独立的 Kafka 服务器被称作 Broker。Broker 负责接收来自生产者的消息,为消息设置偏移量,并将消息存储在磁盘。Broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。

  4. Consumer:消费者。消费者负责从 Broker 订阅并消费消息。接受 Broker 使用 pull(拉) 模式,默认 100ms 拉一次。Consumer 消费的是 Partition 的数据。

  5. Consumer Group:消费者组。多个或一个消费者可以构成一个消费组,一个 Partition 只能被消费组中的一个消费者所消费。但是多个消费者组可以消费同一个 Partition。同一消费者组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消费消息时互不影响。
    使用 多分区 + 多消费者 方式可以极大提高数据下游的处理速度,Kafka 就是通过消费者组的方式来实现消息 P2P 模式和广播模式。

  6. Topic:主题。Kafka 中的消息 以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并消费。

  7. Partition:分区。一个 Topic 可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的 日志(Log)文件,消息在 被追加到分区日志文件 的时候都会分配一个特定的 偏移量(offset)

  8. Offset:偏移。Offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 Offset 并不跨越分区,也就是说,Kafka保证的是分区有序性而不是主题有序性

  9. Record:实际写入 Kafka 中并可以被读取的消息记录。每个 Record 包含了 keyvaluetimestamp

  10. Replication:副本。是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有 主副本对外提供读写服务
    当主副本所在 Broker 崩溃或发生网络异常,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。

  11. Leader:副本领导节点。每一个 Partition 都有对应的 Leader 节点。Producer 写数据只会往 Leader 中写。Consumer 读数据也只从 Leader 中读。

  12. Follower:副本跟随节点。实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

什么是副本

Kafka 为了保证数据不丢失,从 0.8.0 版本开始引入了分区副本机制。在创建 Topic 的时候指定 replication-factor,默认副本为 3。

副本是相对 分区 而言的,一个分区包含一个或多个副本,其中一个为 Leader 副本,其余为 Follower 副本,各个副本位于不同的 Broker 节点中。

所有的 读写操作 都是通过 Leader 进行的,同时 Follower 会定期地去 Leader 上复制数据。当 Leader 挂掉之后,其中一个 Follower 会重新成为新的 Leader。

通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本 可以使 Kafka 在发生崩溃时仍能保证消息的持久性。

Kafka 的 ISR 机制 ★★★

  • ISR(In-Sync Replicas):副本同步队列。
  • AR(Assigned Replicas):分区的所有副本。

ISR 是指与 Leader 副本保持同步状态的副本集合(或者说副本列表)。每个 Partition 都会有一个 ISR,由 Leader 动态维护。当然 Leader 副本本身也是这个集合中的一员。

当 ISR 中的 Follower 完成数据同步之后,Leader 就会给 Follower 发送 ack,如果其中一个 Follower 长时间未向 Leader 同步数据,该 Follower 将会被踢出 ISR 集合,或者一个 Follower 比 Leader 落后太多,也会被踢出去。当 Leader 发生故障后,会从 ISR 集合中重新选举出新的 Leader。

1
2
3
4
5
6
7
8
9
10
11
# 如果 Leader 发现 Follower 超过 10 秒没有向它发起 fech 请求, 那么 Leader 考虑这个 Follower 是不是程序出了点问题
# 或者资源紧张调度不过来,它太慢了,不希望它拖慢后面的进度,所以就会把它从 ISR 中移除
rerplica.lag.time.max.ms=10000


# 相差4000条就移除
rerplica.lag.max.messages=4000


# 需要保证 ISR 中至少有多少个 replicas
min.insync.replicas=1

Kafka 如何做到高吞吐量和性能的 ★★★

Kafka 实现高吞吐量和性能,主要通过以下几点:

1、页缓存技术

Kafka 是基于操作系统的 页缓存 来实现文件写入的。

操作系统本身有一层缓存,叫做 page cache,是在 内存里的缓存,我们也可以称之为 os cache,意思就是操作系统自己管理的缓存。

Kafka 在写入磁盘文件的时候,可以直接写入 os cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入磁盘文件中。

通过这一个步骤,就可以将磁盘文件 写性能 提升很多了,因为其实这里相当于是在写内存,不是在写磁盘,原理图如下:

2、磁盘顺序写

另一个主要功能是 Kafka 写数据的时候,是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据。

普通的机械磁盘如果你要是随机写的话,确实性能极差,也就是随便找到文件的某个位置来写数据。

但是如果你是 追加文件末尾 按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能相差无几。

基于上面两点,Kafka 就实现了写入数据的超高性能

3、零拷贝

传统的数据文件拷贝过程如下图所示,大概可以分成四个过程:

  1. 操作系统将数据从磁盘中加载到内核空间的 Read Buffer(页缓存区)中。
  2. 应用程序将 Read Buffer 中的数据拷贝到应用空间的应用缓冲区中。
  3. 应用程序将应用缓冲区的数据拷贝到内核的 Socket Buffer 中。
  4. 操作系统将数据从 Socket Buffer 中发送到网卡,通过网卡发送给数据接收方。

在这个过程中,可以发现,数据从磁盘到最终发出去,要经历 4 次拷贝,而在这四次拷贝过程中,有两次拷贝是浪费的。

  1. 从内核空间拷贝到用户空间。
  2. 从用户空间再次拷贝到内核空间。

除此之外,由于用户空间和内核空间的切换,会带来 CPU 上下文切换,对于 CPU 的性能也会造成影响,而所谓的零拷贝,就是把这两次多余的拷贝忽略掉,应用程序可以直接把磁盘中的数据,从内核中直接传输到 Socket,而不需要再次经过应用程序所在的用户空间。

零拷贝通过 DMA(Direct Memory Access)技术实现,过程如下:

  1. 把文件内容复制到内核空间中的 Read Buffer。
  2. 接着把包含数据长度和位置的信息文件描述符,加载到 Socket Buffer 中,DMA 引擎直接可以把数据从内核空间传递到网卡中。

所以,所谓的零拷贝并不是完全没有数据的拷贝,只是 相对用户空间 来说,不需要进行数据的拷贝。只是减少了不必要的拷贝次数而已。

Kafka 在读数据的时候引入零拷贝技术。 Kafka 直接让操作系统的 cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,如下图所示:

Kafka 从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是直接读内存的。

Kafka 集群经过良好的调优,数据直接写入 os cache 中,然后读数据的时候也是从 os cache 中读。相当于 Kafka 完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。

4、批发送

批处理是一种常用的用于提高 I/O 性能的方式。对 Kafka 而言,批处理既减少了网络传输的 Overhead,又提高了写磁盘的效率。Kafka 0.82 之后是将多个消息合并之后再发送,而并不是 send 一条就立马发送(之前支持)。

5、数据压缩

数据压缩的一个基本原理是,重复数据越多压缩效果越好。因此将整个 Batch 的数据一起压缩能更大幅度减小数据量,从而更大程度提高网络传输效率。

1
2
3
4
5
6
7
# 批量发送的基本单位, 默认是 16384 Bytes, 即16kB
batch.size

# 延迟时间
linger.ms

# 两者满足其一便发送

Broker 接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘,Consumer 接受到压缩后的数据再解压缩。

整体来讲: Producer 到 Broker,副本复制,Broker 到 Consumer 的数据都是压缩后的数据,保证高效率的传输。

日志索引

Kafka 能支撑 TB 级别数据,在日志级别有两个原因: 顺序写和日志索引。

Kafka 在一个日志文件达到一定数据量 (1G) 之后,会生成新的日志文件,大数据情况下会有多个日志文件,通过偏移量来确定到某行纪录时,如果遍历所有的日志文件,那效率自然是很差的。Kafka 在日志级别上抽出来一层日志索引,来方便根据 offset 快速定位到是某个日志文件。

每一个 partition 对应多个 log 文件(最大 1G),每一个 log 文件又对应一个 index 文件通过 offset 查找 Message 流程:

  1. 先根据 offset(例: 368773),二分定位到最大小于等于该 offset 的 index 文件(368769.index)
  2. 通过二分(368773 - 368769 = 4)定位到 index 文件 (368769.index) 中最大 小于等于 该 offset 的对于的 log 文件偏移量(3, 497)
  3. 通过定位到该文件的消息行(3, 497),然后在往后一行一行匹配揭露(368773 830)。

生产者向 Kafka 发送消息的执行流程

如下图所示:

  1. 生产者要往 Kafka 发送消息时,需要创建 ProducerRecoder,代码如下:

    1
    2
    3
    4
    5
    6
    ProducerRecord<String,String> record = new ProducerRecoder<>("CostomerCountry","Precision Products","France");
    try{
    producer.send(record);
    }catch(Exception e){
    e.printStackTrace();
    }
  2. ProducerRecoder 对象会包含目标 Topic、分区内容、以及指定的 key 和 value,在发送 ProducerRecoder 时,生产者会先把键和值对象序列化成字节数组,然后在网络上传输。

  3. 生产者在将消息发送到某个 Topic 之前,需要经过拦截器、序列化器和分区器(Partitioner)

    • 拦截器:发送前准备:过滤、修改消息,发送回调前:统计。
    • 序列化器:对象转换成字节数组发送给 Kafka。
    • 分区器:根据 key 计算 partition。
  4. 如果消息 ProducerRecord 没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区

    1. 若没有指定分区,且消息的 key 不为空,则使用 murmur 的 Hash 算法(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区分配。
    2. 若没有指定分区,且消息的 key 也是空,则用轮询的方式选择一个分区。
  5. 分区选择好之后,会将消息添加到一个记录批次中,这个批次的所有消息都会被发送到相同的 Topic 和 partition 上。然后会有一个独立的线程负责把这些记录批次发送到相应的 Broker 中。

  6. Broker 接收到消息后,会作出一个响应。如果成功写入 Kafka 中,就返回一个 RecordMetaData 对象,它包含 Topic 和 Partition 信息,以及记录在分区的 offset

  7. 若写入失败,就返回一个错误异常,生产者在收到错误之后尝试重新发送消息,几次之后如果还失败,就返回错误信息。

Producer 发送的一条 message 中包含哪些信息

消息由 可变长度的报头、可变长度的不透明密钥字节数组和可变长度的不透明值字节数组 组成。

RecordBatch 是 Kafka 数据的存储单元,一个 RecordBatch 中包含多个 Record(即我们通常说的一条消息)。而每条消息又可以包含多个 Header 信息,Header 是 Key-Value 形式的。RecordBatch 中各个字段的含义如下:

Kafka 文件存储机制

如下图所示:

在 Kafka 中,一个 Topic 会被分割成多个 Partition,而 Partition 由多个更小的 Segment 的元素组成。

一个 Partition 下会包含下图的一些文件,由 log、index、timeindex 三个文件组成一个 Segment而文件名中的(0)表示的是一个 Segment 的起始 Offset

Kafka 会根据 log.segment.bytes 的配置来决定单个 Segment 文件(log)的大小,当写入数据达到这个大小时就会创建新的 Segment 。

(1)log 文件解析示意图

(2)index 文件解析示意图

(3)timeindex 文件解析示意图

log、index、timeindex 中存储的都是 二进制 的数据(log 中存储的是 BatchRecords 消息内容,而 index 和 timeindex 分别是一些索引信息)。

举例:现在创建一个 lyz Topic,三个分区,一个副本。

1
2
3
4
5
# 创建一个 Topic
[lepeng@centos ~]# ./kafka-topics.sh --create --bootstrap-server 192.168.244.129:9092 --replication-factor 1 --partitions 3 --topic lyz

# 查看创建的 Topic 描述信息:
[lepeng@centos ~]# ./kafka-topics.sh --describe --bootstrap-server 192.168.244.129:9092 --topic lyz

截图如下:

那么,其数据文件的目录结构如下所示,3 个分区对应 3 个文件夹,文件夹命名 topic-分区序号:即 lyz-0,lyz-1,lyz-2

进入其中一个分区的文件夹中,会有 3 种类型文件。

如何根据 offset 找到对应的 Message

通过 Offset 从存储层中获取 Message 大致分为两步:

  1. 第一步,根据 Offset 找到所属的 Segment 文件。可以直接根据 Segment 的文件名进行查找(上面已经介绍了 Segment 的文件就是它包含的数据的起始 Offset)

  2. 第二步,从 Segment 中获取对应 Offset 的消息数据。这步需要一些索引信息来快速定位目标数据在 Segment 中的位置,否则就要读取整个 Segment 文件了,这里需要的索引信息就是上面的 index 文件存储的内容。

index 文件中包含多个索引条目,每个条目表示数据文件中一条 Message 的索引。索引包含两个部分(均为4个字节的数字),分别为相对 offset 和 position,如下内容所示:

1
2
3
4
5
6
[lepeng@centos ~]# ./kafka-run-class.sh  kafka.tools.DumpLogSegments --files ../kafkaLog/lyz-0/00000000000000000000.index  --print-data-log

offset: 9 position: 13713
offset: 15 position: 27799
offset: 21 position: 43149
offset: 27 position: 58432

如下图所示:

index 文件中存储的是 Offset 和 Position(Offset 对应的消息在 log 文件中的偏移量)的对应关系,这样当有 Offset 时可以快速定位到 Position 读取 BatchRecord ,然后再从 BatchRecord 中获取某一条消息。

比如上述 Offset21 会被定位到 27 这个 BatchRecord,然后再从这个 BatchRecord 中取出第二个 Record(27 这个 BatchRecord 包含了 27 、28 两个 Record)。

Kafka 并不会为每个 Record 都保存一个索引,而是根据 log.index.interval.bytes 等配置 构建稀疏索引信息

Kafka 中还维护了 timeindex,保存了 Timestamp 和 Offset 的关系,在一些场景需要根据 timestamp 来定位消息。timeindex 中的一个(timestampX,offsetY)元素的含义是所有创建时间大于 timestampX 的消息的 Offset 都大于 offsetY

查找方式如下图所示

Kafka 如何实现消息是有序的 ★★★★★

生产者:分区的 Leader 副本负责数据以先进先出的顺序写入,保证消息写入时有序。

消费者:同一个分区内的消息只能被一个 group 里的一个消费者消费,保证分区内消费有序。

整个 Kafka 不保证有序。如果为了保证 Kafka 全局有序,那么设置 一个生产者,一个分区,一个消费者。

Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。

总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:

  1. 1 个 Topic 只对应一个 Partition。
  2. (推荐)发送消息的时候指定 key/Partition。

Kafka 生产者消息分区算法

Kafka 包含三种分区算法:

(1)轮询策略

也称 Round-robin 策略,即顺序分配。比如一个 Topic 下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第四条消息时又会重新开始。

轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是平时最常用的分区策略之一。

(2)随机策略

也称 Randomness 策略。所谓随机就是我们随意地将消息放置在任意一个分区上,如下图:

(3)按 key 分配策略

Kafka 允许为每条消息定义消息键,简称为 key。一旦消息被定义了 key,那么你就可以保证同一个 key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,所以每个 key 下的消息也是有序的。如下图所示:

Kafka 分区数可以增加或减少吗?为什么

我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。 Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂。

分片规则(不同的副本对应的 Broker)

Kafka 分配 Replicas 的算法有两种: RangeAssignor 和 RoundRobinAssignor,默认为 RangeAssignor:

  1. 将所有 Broker(假设共 n 个 Broker)和待分配的 Partition 排序。
  2. 将第 i 个 Partition 分配到第 i mod n 个 Broker 上。
  3. 将第 i 个 Partition 的第 j 个 Replica 分配到第 (i + j) mod n 个 Broker 上。

Kafka 的消费者组跟分区之间有什么关系 ★★★★★

Kafka 通过消费者组管理消费者,假设一个主题中包含 4 个分区:

  1. 在一个消费者组中只有一个消费者,那消费者将收到全部 4 个分区的消息。

  2. 如果存在两个消费者,那么四个分区将根据分区分配策略分配个两个消费者。

  3. 如果存在四个消费者,将平均分配,每个消费者消费一个分区。

  4. 如果存在五个消费者,就会出现消费者数量多于分区数量,那么多余的消费者将会被闲置,不会接收到任何信息。

Kafka 消费者分区分配策略

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。

  • RangeAssignor 范围分配:Kafka 默认的分配策略,针对每个消费者的每个 Topic 进行范围分配。即每个消费者消费一定范围的分区,尽量地实现将分区均分给不同的消费者,如果不能均分,消费者由 Kafka 自动编号,优先将分区分配给消费者编号比较小的消费者。并且每个消费者消费的范围是连续的。

  • RoundRobinAssignor 轮询分配:Kafka 2.0 版本之前使用。按字面意思,Kafka 会根据 Topic 订阅来轮询分配,将所有的分区按 Topic 名称和分区编号进行排序,轮询分配给每个消费者(如果第一轮轮询结束,会接着上一轮轮询分配的结果继续轮询分配)。

  • StickyAssignor 黏性分配:Kafka 2.0 版本之后建议使用。
    优点:黏性分配除了考虑分配的均衡度以外,重分配的效率更高。
    规则:类似于轮询分配,尽量地将分区均衡地分配给消费者。

Kafka 的默认消息保留策略

Broker 默认的消息保留策略分为两种:

  1. 日志片段通过 log.segment.bytes 配置(默认是 1GB)
  2. 日志片段通过 log.segment.ms 配置 (默认 7 天)

Kafka 是如何清理过期数据的

Kafka 将数据持久化到了硬盘上之后,可以配置一定的策略对数据进行清理,清理的策略有两个:删除和压缩

1、删除

log.cleanup.policy=delete 启用删除策略。

直接删除,删除后的消息不可恢复。可配置以下两个策略:

1
2
3
4
5
# 清理超过指定时间清理:  
log.retention.hours=16

# 超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824

为了避免在删除时阻塞读操作,采用了 copy-on-write 形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于 Java 的 CopyOnWriteArrayList。

2、压缩

将数据压缩,只保留每个 key 最后一个版本的数据。

首先在 Broker 的配置中设置 log.cleaner.enable=true 启用 cleaner,这个默认是关闭的。

在 Topic 的配置中设置 log.cleanup.policy=compact 启用压缩策略。

Kafka 如何实现单个集群间的消息复制

Kafka 消息复制机制只能在单个集群中进行复制,不能在多个集群之间进行。

Kafka 提供了一个叫做 MirrorMaker 的核心组件,该组件包含一个生产者和一个消费者,两者之间通过一个队列进行相连,当消费者从一个集群读取消息,生产者把消息发送到另一个集群。

Kafka 的 unclean 是啥

Kafka 在 Broker 端提供了一个配置参数 unclean.leader.election 这个参数有两个值:

  • true(默认):允许不同步副本成为 Leader,由于不同步副本的消息较为滞后,此时成为 Leader,可能会出现消息不一致的情况。
  • false:不允许不同步副本成为 Leader,此时如果 ISR 列表为空,会一直等待旧 Leader 恢复,降低了可用性。

优先副本是什么?它有什么特殊的作用

优先副本:是默认的 Leader 副本,发生 Leader 变化时,重选举会优先选择优先副本作为 Leader。

如何进行 Replication Leader 选举 ★★★

当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,只有 ISR 里的成员才有被选为 Leader 的可能。Kafka 会从 ISR 列表中选择 第一个 Follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。

如果 ISR 全部宕机,则选择 第一个恢复的副本 当做 Leader 节点 (消息可能会丢失或者重复消费)。

如何进行 Broker Leader 选举

  1. 在 Kafka 集群中,会有多个 Broker 节点,集群中第一个启动的 Broker 会通过在 zookeeper 中创建临时节点 /controller 来让自己成为控制器,其他 Broker 启动时也会在 zookeeper 中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在 zookeeper 中创建 watch 对象,便于它们收到控制器变更的通知。

  2. 如果集群中有一个 Broker 发生异常退出了,那么控制器就会检查这个 Broker 是否有分区的 Replication Leader,如果有,那么这个分区就需要一个新的 Replication Leader,此时控制器就会去遍历其他副本,决定哪一个成为新的 Replication Leader,同时更新分区的 ISR 集合。

  3. 如果有一个 Broker 加入集群中,那么控制器就会通过 Broker ID 去判断新加入的 Broker 中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。

  4. 集群中每选举一次控制器,就会通过 zookeeper 创建一个 controller epoch,每一个选举都会创建一个更大,包含最新信息的 epoch,如果有 Broker 收到比这个 epoch 旧的数据,就会忽略它们,Kafka 也通过这个 epoch 来防止集群产生“脑裂”。

LEO、HW、LSO、LW 分别代表什么?★★★

  • LEO:LogEndOffset 的简称,日志末端位移,记录了该副本对象底层日志文件中下一条消息的位移值,副本写入消息的时候,会自动更新 LEO 值,Leader 会保存两个 LEO 值,一个是自己的 LEO 值,另外一个是 remote 的 LEO 值。Follower 每次 fetch 请求都会携带当前 LEO,Leader 会选择最小的 LEO 来更新 HW。

  • HW:水位或水印一词,也可称为高水位(high watermark),通常被用在流式处理领域(flink、spark),以表征元素或事件在基于时间层面上的进展。
    在 Kafka 中,水位的概念与时间无关,而是与位置信息相关。严格来说,它表示的就是位置信息,即位移(offset)。取 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的上一条信息。
    HW 一定不会大于 LEO 值,小于 HW 值的消息被认为是”已提交”或”已备份”的消息,并对消费者可见。

  • LSO: LastStableOffset 的简称,对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW。

  • LW: Low Watermark 低水位,代表 AR 集合中最小的 logStartOffset 值。

Kafka 为什么不支持读写分离

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 Leader 进行交互的,不会与 Follower 进行交互,Follower 只是为了保证消息的可用性,从而实现的是一种主写主读的生产消费模型。

Kafka 不支持主写从读,因为主写从读有 2 个很明显的缺点:

  1. 数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X,之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。

  2. 延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

Rebalance

消费端的 Rebalance 就是让一个消费组内的所有消费者就如何消费 Topic 的所有分区达成共识的过程。在 Rebalance 过程中,所有的 Consumer 实例都会停止消费,等待 Rebalance 的完成,严重影响消费端的 TPS,所以应当尽量避免。

Coordinator

Group Coordinator 是一个服务,每个 Broker 在启动的时候都会启动一个该服务。Group Coordinator 的作用是用来存储 Group 的相关 Meta 信息,并将对应 Partition 的 Offset 信息记录到 Kafka 内置 Topic(__consumer_offsets) 中。

在 Kafka 0.9 之前是基于 Zookeeper 来存储 Partition 的 offset 信息(consumers/{group}/offsets/{topic}/{partition}),因为 Zookeeper 并不适用于频繁的写操作,所以在 0.9 之后通过内置 Topic 的方式来记录对应 Partition 的 offset。

Rebalace 流程

Rebalance 过程分为两步:Join 和 Sync

  1. Join: 顾名思义就是加入组。这一步中,所有 Consumer 都向 Coordinator 发送 JoinGroup 请求,请求加入消费组。一旦所有成员都发送了 JoinGroup 请求,Coordinator 会从中选择一个 Consumer 担任 Leader 的角色,并把组成员信息以及订阅信息发给 Consumer Leader。
    注意:Consumer Leader 和 Coordinator不是一个概念。Consumer Leader 负责消费分配方案的制定。

  2. Sync: Consumer Leader 开始分配消费方案,即哪个 Consumer 负责消费哪些 Topic 的哪些 Partition。一旦完成分配,Leader 会将这个方案封装进 SyncGroup 请求中发给 Coordinator,非 Leader 也会发 SyncGroup 请求,只是内容为空。
    Coordinator 接收到分配方案之后会把方案塞进 SyncGroup 的 Response 中发给各个 Consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

如何避免消费端 Rebalance

Rebalance 发生的条件有三种:

  1. 消费组内的消费者成员数量发生变化。
    1. 新的消费者加入到消费组。
    2. 消费者主动退出消费组。
    3. 消费者被动下线。比如消费者长时间的 GC,网络延迟导致消费者长时间未向 Group Coordinator 发送心跳请求,均会认为该消费者已经下线并踢出。
  2. 订阅的 Topic 的 Consumer Group 个数发生变化。
  3. 消费主题的分区数量发生变化。

对于上述的 2 和 3,我们可以人为避免。1.1 和 1.2 人为也可以尽量避免,主要核心为 1.3。

正常情况下,每个消费者都会定期向组协调器发送心跳,表明自己还存活,如果消费者不能及时的发送心跳,组协调器会被认为该消费者已经“死”了,就会导致消费者离组引发 Rebalance 问题。

这里涉及到两个参数:

  1. session.timeout.msheartbeat.interval.ms:分别为组协调器认为消费组存活的期限和消费者发送心跳的时间间隔。
  2. max.poll.interval.ms:为 Consumer 两次拉取数据的最大时间间隔,超过这个时间,也会导致消费者离组。

另外,如果 Consumer 端频繁进行 FullGC(GC会回收整个堆的内存)也会导致消费端长时间停顿,引发 Rebalance。为了解决这个问题,主要从以下方面入手:

  1. 监控消费者的 GC 情况。

Kafka 目前有哪些内部 Topic,它们都有什么特征?各自的作用又是什么

__consumer_offsets 以下划线开头,保存消费组的偏移。

消费者提交消费位移时提交的是当前消费到的最新消息的 offset 还是 offset+1

offset+1

Kafka 使用的是推模式还是拉模式 ★★★★★

首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。

  • 推(push)模式指的是:消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。
  • 拉(pull)模式指的是:Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。Kafka 选择了拉模式。

Kafka 在拉请求中有参数,可以使得消费者请求在 长轮询 中阻塞等待。

简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。

并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。

pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。pull 模式不足之处是,如果 Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

一些场景

如何保证每个应用程序都可以获取到 Kafka 主题中的所有消息,而不是部分消息。

为每个应用程序创建一个消费者组,然后往组中添加消费者来伸缩读取能力和处理能力,每个群组消费主题中的消息时,互不干扰。

如何实现 Kafka 消费者每次只消费指定数量的消息。

写一个队列,把 consumer 作为队列类的一个属性,然后增加一个消费计数的计数器,当到达指定数量时,关闭 consumer。

Kafka 如何实现多线程的消费

Kafka 允许同组的多个 partition 被一个 consumer 消费,但不允许一个 partition 被同组的多个 consumer 消费。

多线程消费分为两种情况:

  1. 在每个线程中新建一个 Kafka Consumer。
  2. 单线程创建 Kafka Consumer,多个处理线程处理消息,难点在于是否要考虑消息顺序性,offset 的提交方式。

Kafka 怎么做认证 ★★★

Kafka 可以设置用户名和密码。

Kafka topic 过多有什么问题 ★★★

  • 当分区数激增的时候,Kafka 的顺序写入特性会被大大破坏从而引入大量的随机I/O,因此吞吐量会下降。
    • 如果有 1000 个类似的进程顺序写 1000 个不同的文件,表面上都是在顺序写,但是对于磁盘来讲,需要不断地切换磁头到不同的文件,其实也已经表现为随机写了
    • 磁盘只有一个,单个分区看是顺序写,全局看就是随机写,需要为每个分区的顺序写来回切换。
  • topic 太多出现 too many file,打开文件过多。
  • partition 过多在 controller 选举和 controller 重新选举 partition leader 的耗时会大大增加,造成 Kafka 不可用的时间延长。

Reference


01-Kafka 基础
https://flepeng.github.io/interview-43-MQ-43-Kafka-01-Kafka-基础/
作者
Lepeng
发布于
2020年8月8日
许可协议