Kafka 最多一次、至少一次、精确传递一次
首先了解一下消息传递语义(message delivery semantic)。这是一个通用的概念,也就是消息传递过程中消息传递的保证性。分为三种:
最多一次(at most once):消息生产者发送消息,但不确保消息是否成功到达,消息可能丢失,也可能被处理,但最多只会被处理一次,不会重复。
- 特点:可能丢失、不会重复
- Kafka 中,生产者在无 ACK 的机制下保证消息最多发送一次。
至少一次(at least once):消息不会丢失,但可能被处理多次。
- 特点:可能重复、不会丢失。
- Kafka 中,生产者使用 ACK 机制确保消息至少传递一次。即使生产者收到了部分的 ACK,它也可能会重新发送消息,导致消息可能被消费者多次处理。
精确一次又叫恰好一次(exactly once):消息 被处理 且只会被处理一次。
- 特点:不丢失、不重复、就一次。
- Kafka 中,生产者使用 ACK 机制配合幂等性和事务特性,以实现精确一次处理的语义。
而 Kafka 其实有两次消息传递,一次生产者发送消息给 Kafka,一次消费者去 Kafka 消费消息。
两次传递都会影响最终结果,两次都是精确一次,最终结果才是精确一次。两次中有一次会丢失消息,或者有一次会重复,那么最终的结果就是可能丢失或者重复的。
1、Produce 端消息传递
Produce 端的代码
1 |
|
参数 acks 有三个值选择:
0
:producer 完全不管 broker 的处理结果,回调也就没有用了,这个参数不能保证消息成功发送。但是这种吞吐量最高。all 或者 -1
:leader broker 会等消息写入,并且 ISR 都写入后才会响应,这种只要 ISR 有副本存活就肯定不会丢失,但吞吐量最低。1
:默认值。leader broker 自己写入后就响应,不会等待 ISR 其他的副本写入,只要 leader broker 存活就不会丢失,即保证了不丢失,也保证了吞吐量。
设置为 0 时,实现了 at most once。
设置为 1 时,消息不会丢失。但是有一种情况:消息成功写入,而这个时候由于网络问题 producer 没有收到写入成功的响应,producer 就会开启重试的操作,直到网络恢复,消息就发送了多次。这就是 at least once。
Kafka producer acks 的默认值为 1,所以默认的 producer 级别是 at least once。并不能 exactly once。
2、Consumer 端消息传递
consumer 是靠 offset 保证消息传递的。
consumer 消费的代码如下:
1 |
|
其中有一个参数是 enable.auto.commit
- 设置为 true。 consumer 在消费之前提交位移,实现了 at most once。默认值。
- 设置为 false。consumer 在消费后提交,实现了 at least once。
Kafka consumer 的 enable.auto.commit 的默认值为 true,所以默认的 consumer 级别是 at most once。也并不能 exactly once。
3、精确一次 producer
通过了解 producer 端与 consumer 端的设置,我们发现 Kafka 在两端的配置都不能做到 exactly once,好像 Kafka 的消息一定会丢失或者重复的,是不是没有办法做到 exactly once 了呢?
在 Kafka 0.11.0.0 版本之前 producer 端确实是不可能的,但是在 Kafka 0.11.0.0 版本之后,Kafka 正式推出了 idempotent producer。
Kafka 分别通过 幂等性(Idempotence)和事务(Transaction)这两种机制实现了 精确一次(exactly once)语义。
3.1、幂等性(Idempotence)
幂等
这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。
幂等性最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。
Kafka 中,Producer 默认不是幂等性的,如果开启仅需设置一个参数即可,即
1 |
|
enable.idempotence
被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。
生产者客户端
Kafka 为了实现幂等性,它在底层设计架构中引入了 ProducerID 和 SequenceNumber。
- PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
- Sequence Numbler。(对于每个 PID,该 Producer 发送数据的每个
<Topic, Partition>
都对应一个从 0 开始单调递增的 Sequence Number。
生产者每发送一条消息就会将 <PID,分区>
对应的序列号的值 + 1。
服务端 Broker
Broker 端也会为每一对 <PID, 分区>
维护一个序列号,并且每次 Commit 一条消息时将其对应序号递增。
对于收到的每一条消息,会 判断 Broker 端的序列号 SN_old
和 接收到消息中的序列号 SN_new
进行对比;
只有
SN_new = SN_old + 1
,Broker 才会接收它。如果
SN_new < SN_old + 1
,说明消息被重复写入,Broker 可以直接将其丢弃。如果
SN_new > SN_old + 1
,说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出 OutOfOrderSequenceException 异常,这是一个严重的异常,后续的诸如 send() beginTransaction() commitTransaction 等方法的调用都会抛出 IllegalStateException 异常
注意:引入序列号来实现幂等也只是针对每一对 <PID,分区>
而言的,也就是说,Kafka 的幂等只能保证单个生产者会话 session 中单个分区的幂等。
实现比较简单,同样的限制也比较大:
- 首先,它只能保证单分区上的幂等性。即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
- 因为 SequenceNumber 是以 Topic + Partition 为单位单调递增的,如果一条消息被发送到了多个分区必然会分配到不同的 SequenceNumber,导致重复问题。
- 其次,它只能实现单会话上的幂等性。不能实现跨会话的幂等性。当你重启 Producer 进程之后,这种幂等性保证就丧失了。
- 重启 Producer 后会分配一个新的 ProducerID,相当于之前保存的 SequenceNumber 就丢失了。
3.2、事务(Transaction)
Kafka 的事务概念类似于我们熟知的数据库提供的事务。
Kafka 自 0.11 版本开始也提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启
enable.idempotence = true
。 - 设置 Producer 端参数
transactional.id
。最好为其设置一个有意义的名字。
此外,你还需要在 Producer 代码中做一些调整,如下:
1 |
|
和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。
实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level
参数的值即可。当前这个参数有两个取值:
read_uncommitted
:默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。- 很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
read_committed
:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。- 当然了,它也能看到非事务型 Producer 写入的所有消息。
4、精确一次 consumer
上面 producer 端实现了 exactly once,那么 consumer 端呢?
consumer 端由于可能无法消费事务中所有消息,并且消息可能被删除,所以事务并不能解决 consumer 端exactly once 的问题,我们可能还是需要自己处理这方面的逻辑。比如自己管理 offset 的提交,不要自动提交,也是可以实现 exactly once 的。
还有一个选择就是使用 Kafka 自己的流处理引擎,也就是 Kafka Streams,
设置 processing.guarantee=exactly_once
,就可以轻松实现 exactly once 了。
Reference
- https://cloud.tencent.com/developer/article/1494140
- https://blog.csdn.net/weixin_47467016/article/details/134602077
- https://blog.csdn.net/jy02268879/article/details/106023273
- https://www.jianshu.com/p/b1599f46229b
- https://www.infoq.cn/article/kafka-analysis-part-8
- https://zhuanlan.zhihu.com/p/668156196