Kafka 最多一次、至少一次、精确传递一次

首先了解一下消息传递语义(message delivery semantic)。这是一个通用的概念,也就是消息传递过程中消息传递的保证性。分为三种:

  • 最多一次(at most once):消息生产者发送消息,但不确保消息是否成功到达,消息可能丢失,也可能被处理,但最多只会被处理一次,不会重复。

    • 特点:可能丢失、不会重复
    • Kafka 中,生产者在无 ACK 的机制下保证消息最多发送一次。
  • 至少一次(at least once):消息不会丢失,但可能被处理多次。

    • 特点:可能重复、不会丢失。
    • Kafka 中,生产者使用 ACK 机制确保消息至少传递一次。即使生产者收到了部分的 ACK,它也可能会重新发送消息,导致消息可能被消费者多次处理。
  • 精确一次又叫恰好一次(exactly once):消息 被处理 且只会被处理一次。

    • 特点:不丢失、不重复、就一次。
    • Kafka 中,生产者使用 ACK 机制配合幂等性和事务特性,以实现精确一次处理的语义。

而 Kafka 其实有两次消息传递,一次生产者发送消息给 Kafka,一次消费者去 Kafka 消费消息。

两次传递都会影响最终结果,两次都是精确一次,最终结果才是精确一次。两次中有一次会丢失消息,或者有一次会重复,那么最终的结果就是可能丢失或者重复的。

1、Produce 端消息传递

Produce 端的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 1; i <= 600; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
System.out.println("testkafka"+i);
}
kafkaProducer.close();

参数 acks 有三个值选择:

  • 0:producer 完全不管 broker 的处理结果,回调也就没有用了,这个参数不能保证消息成功发送。但是这种吞吐量最高。
  • all 或者 -1:leader broker 会等消息写入,并且 ISR 都写入后才会响应,这种只要 ISR 有副本存活就肯定不会丢失,但吞吐量最低。
  • 1:默认值。leader broker 自己写入后就响应,不会等待 ISR 其他的副本写入,只要 leader broker 存活就不会丢失,即保证了不丢失,也保证了吞吐量。

设置为 0 时,实现了 at most once。

设置为 1 时,消息不会丢失。但是有一种情况:消息成功写入,而这个时候由于网络问题 producer 没有收到写入成功的响应,producer 就会开启重试的操作,直到网络恢复,消息就发送了多次。这就是 at least once。

Kafka producer acks 的默认值为 1,所以默认的 producer 级别是 at least once。并不能 exactly once。

2、Consumer 端消息传递

consumer 是靠 offset 保证消息传递的。

consumer 消费的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}

其中有一个参数是 enable.auto.commit

  • 设置为 true。 consumer 在消费之前提交位移,实现了 at most once。默认值。
  • 设置为 false。consumer 在消费后提交,实现了 at least once。

Kafka consumer 的 enable.auto.commit 的默认值为 true,所以默认的 consumer 级别是 at most once。也并不能 exactly once。

3、精确一次 producer

通过了解 producer 端与 consumer 端的设置,我们发现 Kafka 在两端的配置都不能做到 exactly once,好像 Kafka 的消息一定会丢失或者重复的,是不是没有办法做到 exactly once 了呢?

在 Kafka 0.11.0.0 版本之前 producer 端确实是不可能的,但是在 Kafka 0.11.0.0 版本之后,Kafka 正式推出了 idempotent producer。

Kafka 分别通过 幂等性(Idempotence)和事务(Transaction)这两种机制实现了 精确一次(exactly once)语义。

3.1、幂等性(Idempotence)

幂等 这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。

幂等性最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。

Kafka 中,Producer 默认不是幂等性的,如果开启仅需设置一个参数即可,即

1
2
3
props.put("enable.idempotence", ture)

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。

生产者客户端

Kafka 为了实现幂等性,它在底层设计架构中引入了 ProducerID 和 SequenceNumber。

  • PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
  • Sequence Numbler。(对于每个 PID,该 Producer 发送数据的每个 <Topic, Partition> 都对应一个从 0 开始单调递增的 Sequence Number。

生产者每发送一条消息就会将 <PID,分区> 对应的序列号的值 + 1。

服务端 Broker

Broker 端也会为每一对 <PID, 分区> 维护一个序列号,并且每次 Commit 一条消息时将其对应序号递增。

对于收到的每一条消息,会 判断 Broker 端的序列号 SN_old 和 接收到消息中的序列号 SN_new 进行对比;

  • 只有 SN_new = SN_old + 1,Broker 才会接收它。

  • 如果 SN_new < SN_old + 1,说明消息被重复写入,Broker 可以直接将其丢弃。

  • 如果 SN_new > SN_old + 1,说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出 OutOfOrderSequenceException 异常,这是一个严重的异常,后续的诸如 send() beginTransaction() commitTransaction 等方法的调用都会抛出 IllegalStateException 异常

注意:引入序列号来实现幂等也只是针对每一对 <PID,分区> 而言的,也就是说,Kafka 的幂等只能保证单个生产者会话 session 中单个分区的幂等。

实现比较简单,同样的限制也比较大:

  • 首先,它只能保证单分区上的幂等性。即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
    • 因为 SequenceNumber 是以 Topic + Partition 为单位单调递增的,如果一条消息被发送到了多个分区必然会分配到不同的 SequenceNumber,导致重复问题。
  • 其次,它只能实现单会话上的幂等性。不能实现跨会话的幂等性。当你重启 Producer 进程之后,这种幂等性保证就丧失了。
    • 重启 Producer 后会分配一个新的 ProducerID,相当于之前保存的 SequenceNumber 就丢失了。

3.2、事务(Transaction)

Kafka 的事务概念类似于我们熟知的数据库提供的事务。

Kafka 自 0.11 版本开始也提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

设置事务型 Producer 的方法也很简单,满足两个要求即可:

  • 和幂等性 Producer 一样,开启 enable.idempotence = true
  • 设置 Producer 端参数 transactional.id。最好为其设置一个有意义的名字。

此外,你还需要在 Producer 代码中做一些调整,如下:

1
2
3
4
5
6
7
8
9
producer.initTransactions();
try {
producer.beginTransaction(); //开启一个事务
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}

和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。

这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。

实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:

  • read_uncommitted:默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。
    • 很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。
    • 当然了,它也能看到非事务型 Producer 写入的所有消息。

4、精确一次 consumer

上面 producer 端实现了 exactly once,那么 consumer 端呢?

consumer 端由于可能无法消费事务中所有消息,并且消息可能被删除,所以事务并不能解决 consumer 端exactly once 的问题,我们可能还是需要自己处理这方面的逻辑。比如自己管理 offset 的提交,不要自动提交,也是可以实现 exactly once 的。

还有一个选择就是使用 Kafka 自己的流处理引擎,也就是 Kafka Streams,

设置 processing.guarantee=exactly_once,就可以轻松实现 exactly once 了。

Reference


Kafka 最多一次、至少一次、精确传递一次
https://flepeng.github.io/043-Kafka-Kafka-最多一次、至少一次、精确传递一次/
作者
Lepeng
发布于
2021年3月8日
许可协议