官网保平安:https://kafka.apache.org/
基础 在 Kafka 中,消费者(Consumer)如果没有确认消息(即没有提交Offset),下一次拉取数据的结果将取决于 Offset 的当前值和消费者的配置。以下是几种可能的情况:
自动提交 Offset。如果 Kafka 消费者配置为自动提交 Offset(enable.auto.commit=true
),则 Kafka 消费者会在满足某些条件时自动提交当前消费到的 Offset。这些条件通常包括调用 poll()
方法处理消息、达到一定的时间间隔(auto.commit.interval.ms
)等。然而,如果消费者在消息处理过程中出现异常或崩溃,且这些异常发生在自动提交 Offset 之前,那么已经消费但尚未确认的消息可能会在消费者重启后被重新消费。
下一次拉取结果:
在自动提交 Offset 的情况下,如果消费者没有因为异常而中断,那么下一次拉取的数据将是当前 Offset 之后的新数据。
如果消费者因为异常而中断,并且 Offset 没有正确提交,那么重启后的消费者可能会从上一个自动提交的 Offset 开始重新消费数据,包括之前已经消费但尚未确认的消息。
手动提交 Offset。如果 Kafka 消费者配置为手动提交 Offset(enable.auto.commit=false
),那么消费者需要在确保消息被正确处理后再显式地调用 API(如commitSync()
或 commitAsync()
)来提交 Offset。
下一次拉取结果:
在手动提交 Offset 的情况下,如果消费者在消息处理过程中没有提交 Offset 就发生了异常或崩溃,那么重启后的消费者将从上一次成功提交的 Offset 开始消费数据。这意味着,如果消费者在消费消息后没有提交 Offset 就崩溃了,那么这些消息在消费者重启后会被重新消费。
Kafka 重试机制 在 Kafka 如何保证消息不丢失这里,我们提到了 Kafka 的 消费者 重试机制。
网上关于 Spring Kafka 的默认重试机制文章很多,但大多都是过时的,和实际运行结果完全不一样。以下是根据 spring-kafka-2.9.3 源码重新梳理一下。
消费失败会怎么样? 在消费过程中,当其中一个消息消费异常时,会不会卡住后续队列消息的消费?这样业务岂不是卡住了?
生产者代码:
1 2 3 for (int i = 0 ; i < 10 ; i++) { kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i)) }
消费者消代码:
1 2 3 4 5 6 7 8 @KafkaListener (topics = {KafkaConst.TEST_TOPIC},groupId = "apple" )private void customer (String message) throws InterruptedException { log.info("kafka customer:{}" ,message); Integer n = Integer.parseInt(message); if (n%5 ==0 ){ throw new RuntimeException(); } }
在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。下面是一段消费的日志,可以看出当 test-0@95
重试多次后会被跳过。
1 2 3 2023-08-10 12:03:32.918 DEBUG 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Skipping seek of: test-0@95 2023-08-10 12:03:32.918 TRACE 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Seeking: test-0 to: 96 2023-08-10 12:03:32.918 INFO 9700 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-apple-1, groupId=apple] Seeking to offset 96 for partition test-0
因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。
默认会重试多少次?★★★ 默认配置下,消费异常会进行重试,重试次数是多少, 重试是否有时间间隔?
看源码 FailedRecordTracker
类有个 recovered
函数,返回 Boolean 值判断是否要进行重试,下面是这个函数中判断是否重试的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public boolean recovered (ConsumerRecord << ? , ? > record, Exception exception, @Nullable MessageListenerContainer container, @Nullable Consumer << ? , ? > consumer) throws InterruptedException { if (this .noRetries) { attemptRecovery(record, exception, null , consumer); return true ; } Map < TopicPartition, FailedRecord > map = this .failures.get(); if (map == null ) { this .failures.set(new HashMap < > ()); map = this .failures.get(); } TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition); this .retryListeners.forEach(rl - > rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get())); long nextBackOff = failedRecord.getBackOffExecution().nextBackOff(); if (nextBackOff != BackOffExecution.STOP) { this .backOffHandler.onNextBackOff(container, exception, nextBackOff); return false ; } else { attemptRecovery(record, exception, topicPartition, consumer); map.remove(topicPartition); if (map.isEmpty()) { this .failures.remove(); } return true ; } }
其中, BackOffExecution.STOP
的值为 -1。
1 2 3 4 5 @FunctionalInterface public interface BackOffExecution { long STOP = -1 ; long nextBackOff () ; }
nextBackOff
的值调用 BackOff
类的 nextBackOff()
函数。如果当前执行次数大于最大执行次数则返回 STOP
,既超过这个最大执行次数后才会停止重试。
1 2 3 4 5 6 7 8 9 public long nextBackOff () { this .currentAttempts++; if (this .currentAttempts <= getMaxAttempts()) { return getInterval(); } else { return STOP; } }
那么这个 getMaxAttempts
的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler
。DefaultErrorHandler
默认的构造函数是:
1 2 3 public DefaultErrorHandler () { this (null , SeekUtils.DEFAULT_BACK_OFF); }
SeekUtils.DEFAULT_BACK_OFF
定义的是:
1 2 3 public static final int DEFAULT_MAX_FAILURES = 10 ;public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0 , DEFAULT_MAX_FAILURES - 1 );
DEFAULT_MAX_FAILURES
的值是 10,currentAttempts
从 0 到 9,所以总共会执行 10 次,每次重试的时间间隔为 0。
最后,简单总结一下:Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。
如何自定义重试次数以及时间间隔? 从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 FixedBackOff
控制的,FixedBackOff
是 DefaultErrorHandler
初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler
初始化的时候传入自定义的 FixedBackOff
即可。重新实现一个 KafkaListenerContainerFactory
,调用 setCommonErrorHandler
设置新的自定义的错误处理器就可以实现。
1 2 3 4 5 6 7 8 9 @Bean public KafkaListenerContainerFactory kafkaListenerContainerFactory (ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); FixedBackOff fixedBackOff = new FixedBackOff(1000 , 5 ); factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff)); factory.setConsumerFactory(consumerFactory); return factory; }
如何在重试失败后进行告警? 自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandler
的 handleRemaining
函数,加上自定义的告警等操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf 4jpublic class DelErrorHandler extends DefaultErrorHandler { public DelErrorHandler (FixedBackOff backOff) { super (null ,backOff); } @Override public void handleRemaining (Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) { super .handleRemaining(thrownException, records, consumer, container); log.info("重试多次失败" ); } }
DefaultErrorHandler
只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler
接口。手动实现 CommonErrorHandler
就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。
重试失败后的数据如何再次处理? 当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?
死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被”丢弃”或”死亡”的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。
@RetryableTopic
是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RetryableTopic ( attempts = "5" , backoff = @Backoff (delay = 100 , maxDelay = 1000 ) )@KafkaListener (topics = {KafkaConst.TEST_TOPIC}, groupId = "apple" )private void customer (String message) { log.info("kafka customer:{}" , message); Integer n = Integer.parseInt(message); if (n % 5 == 0 ) { throw new RuntimeException(); } System.out.println(n); }
当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 @DltHandler
处理,也可以使用 @KafkaListener
重新消费。