02-Kafka 消息传递语义
Kafka 消费者的消费模式
Kafka 消费消息时支持三种模式:
at most once
最多一次。先对获取的每一条消息进行 commit,commit 成功之后,再进行消费处理。这个配置消息可能会丢失,但不会重复。at least once
至少一次。保证每一条消息处理成功之后,然后再进行 commit。消息不会丢失,但可能会重复。exactly once
精确传递一次。将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。
Kafka 生产者发送消息的三种方式
- 发送并忘记(Fire-and-forget):此方式只管发送,不关心消息是否都发送成功,对结果不做任何判断处理,实质上是异步发送不做回调,吞吐效率最高,无法保障可靠性。
- 同步发送(Synchronous send):此方式是同步发送,会对每条消息的结果进行判断,
future.get
会进行阻塞直到返回数据表示发送成功,才会继续下一条消息的发送,可以直到每条信息的发送情况。 - 异步发送加回调函数(Asynchronous send):在使用异步加回调的情况下,在使用 send 方法时指定一个回调函数,服务器在响应时会调用此函数,通过回调函数对结果进行处理,可以知道消息是写成功还是失败,回调函数执行完后才会结束,否则会一直阻塞。
Kafka 提供了一个参数 producer.type
来控制是同步发送还是异步发送,有两个值:
sync
:Kafka 写入到 mmap 之后就立即 flush,然后再返回 Producer 叫同步(sync)。async
:写入 mmap 之后立即返回 Producer 不调用 flush 叫异步(async)。
如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type
设置为 sync。
Kafka 生产者消息确认(ack 应答)机制
为保证 Producer 发送的数据,能可靠的到达指定的 Topic,Producer 提供了消息确认机制。生产者往 Broker 的 Topic 中发送消息时,可以通过配置来决定有几个副本收到这条消息才算消息发送成功。可以在定义 Producer 时通过 acks
参数指定,这个参数支持以下三种值:
acks=0
:Producer 不会等待任何来自 Broker 的响应。实现生产者at most once
最多一次。
特点:低延迟,高吞吐,数据可能会丢失。
如果当中出现问题,导致 Broker 没有收到消息,那么 Producer 无从得知,会造成消息丢失。acks=1(默认值)
:只要集群中 Partition Leader 节点收到消息,生产者就会收到一个来自服务器的成功响应。
在这个模式下,如果发生正常的 Leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 Leader 那里。
如果在 Follower 同步之前,Leader 出现故障,将会丢失数据。
此时的吞吐量主要取决于使用的是 同步发送 还是 异步发送,吞吐量还受到发送中消息数量的限制,例如 Producer 在收到 Broker 响应之前可以发送多少个消息。acks=-1
:只有当所有参与复制的节点(ISR)全部都收到消息时,生产者才会收到一个来自服务器的成功响应。实现生产者端at least once
至少一次。
这种模式是最安全的,可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群依然可以运行。
如果和min.insync.replicas
参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。
根据实际的应用场景,选择设置不同的 acks
以此保证数据的可靠性。
Kafka 消费者消息确认消费机制
消费者端有一个参数:enable.auto.commit
- 设置为 true。 consumer 在消费之前提交位移,实现了 at most once。默认值。
- 设置为 false。consumer 在消费后提交,实现了 at least once。
Kafka consumer 的 enable.auto.commit 的默认值为 true,所以默认的 consumer 级别是 at most once。
Kafka 消息丢失、重复消费(消息重复)和消息漏消费
消息丢失
request.required.acks = 0
:这个配置,生产者发出消息之后,默认 ack 成功,如果此时发生网络异常,缓冲区满等情况,消息可能丢失。request.required.acks = 1
:这个配置,生产者发出消息之后,Leader 确认接收,但是在副本进行同步之前挂掉了,数据可能丢失。
解决方法:同步模式下,将属性设置为 request.required.acks = -1
;异步模式下,不设置阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态。
消息重复
生产者端:
request.required.acks = 1
这个配置,消息成功写入,而这个时候由于网络问题 producer 没有收到写入成功的响应,producer 就会开启重试的操作,直到网络恢复,消息就发送了多次。这就是 at least once。- 在 Kafka 0.11.0.0 版本之后,Kafka 正式推出了 idempotent producer。Kafka 分别通过 幂等性(Idempotence)和事务(Transaction)这两种机制实现了 精确一次(exactly once)语义。
- Kafka 为了实现幂等性,它在底层设计架构中引入了 ProducerID 和 SequenceNumber。
- PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
- Sequence Numbler。(对于每个 PID,该 Producer 发送数据的每个
<Topic, Partition>
都对应一个从 0 开始单调递增的 Sequence Number。
生产者每发送一条消息就会将<PID,分区>
对应的序列号的值 + 1。
消费者端:设置为
at least once
模式,当消费者在处理了消息但是没有 commit offset 的情况下宕掉了(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible),那么在它重启之后就会读取到之前已经处理过的数据。解决方法:解决这种情况,就要保证消费端的幂等性,综合实际的情况来考虑。比如,将数据插入到数据库中可以先判断是否已经存在,如果存在,就不做插入处理等。
消息漏消费
- 消费者端:设置为
at most once
模式:当设置成这种模式,消费者获取消息之后就会 commit offset,如果此时宕掉了(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible),就会造成消息漏消费。
Kafka 如何保证数据的不重复和不丢失
生产者端
生产者端:Kafka 分别通过 幂等性(Idempotence)和事务(Transaction)这两种机制实现了 精确一次(exactly once)语义。
幂等性(Idempotence)
幂等
这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。
Kafka 中,Producer 默认不是幂等性的,如果开启仅需设置一个参数即可,即
1 |
|
enable.idempotence
被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。
生产者客户端
Kafka 为了实现幂等性,它在底层设计架构中引入了 ProducerID 和 SequenceNumber。
- PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
- Sequence Numbler。(对于每个 PID,该 Producer 发送数据的每个
<Topic, Partition>
都对应一个从 0 开始单调递增的 Sequence Number。
生产者每发送一条消息就会将 <PID,分区>
对应的序列号的值 + 1。
服务端 Broker
Broker 端也会为每一对 <PID, 分区>
维护一个序列号,并且每次 Commit 一条消息时将其对应序号递增。
对于收到的每一条消息,会 判断 Broker 端的序列号 SN_old
和 接收到消息中的序列号 SN_new
进行对比;
- 只有
SN_new = SN_old + 1
,Broker 才会接收它。 - 如果
SN_new < SN_old + 1
,说明消息被重复写入,Broker 可以直接将其丢弃。 - 如果
SN_new > SN_old + 1
,说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出 OutOfOrderSequenceException 异常,这是一个严重的异常,后续的诸如send()/beginTransaction()/commitTransaction()
等方法的调用都会抛出 IllegalStateException 异常
注意:引入序列号来实现幂等也只是针对每一对 <PID,分区>
而言的,也就是说,Kafka 的幂等只能保证单个生产者会话 session 中单个分区的幂等。
实现比较简单,同样的限制也比较大:
- 首先,它只能保证单分区上的幂等性。即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
- 因为 SequenceNumber 是以 Topic + Partition 为单位单调递增的,如果一条消息被发送到了多个分区必然会分配到不同的 SequenceNumber,导致重复问题。
- 其次,它只能实现单会话上的幂等性。不能实现跨会话的幂等性。当你重启 Producer 进程之后,这种幂等性保证就丧失了。
- 重启 Producer 后会分配一个新的 ProducerID,相当于之前保存的 SequenceNumber 就丢失了。
Kafka 事务
Exactly-Once 语义指的是每条数据对最终结果的影响只有一次,无论期间 硬件或软件程序出现任何异常使得任务中断,将异常处理恢复后也不存在重复处理的数据或未处理数据。这种语义要求每条输入消息只会影响最终结果一次,而不是被处理一次。
Kafka 在 0.11.0.0 版本引入事务支持,事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
Producer 事务
为了实现跨分区跨会话事务,需要引入一个全局唯一的 Transaction ID
,并将 Producer 获取的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的 Transaction ID 获取原来的 PID。
为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator
。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic
,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer 事务
上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其是无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
broker
unclean.leader.election.enable=false
关闭 unclean leader 选举,即不允许非 ISR 中的副本被选举为 leader,以避免数据丢失。
消费者端
消费者端也是 幂等 + 事务,可以保证数据不重复,不丢失。
不过幂等需要我们自己来实现,如 MySQL 在业务场景保存数据时使用了 INSERT INTO ...ON DUPLICATE KEY UPDATE
语法,不存在时插入,存在时更新,是天然支持幂等性的。
事务通常使用 2PC 即 二次提交来实现。