【问题标题】:Spring Kafka - which batch error handler to use?Spring Kafka - 使用哪个批处理错误处理程序?
【发布时间】:2023-04-10 20:08:01
【问题描述】:

我刚开始使用spring-kafka 2.6.4。 我创建了批量轮询消息的消费者工厂:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
            kafkaListenerContainerFactory(MeterRegistry meterRegistry) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(kafkaProperties.getTopicConcurrency());
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
    return factory;
}

现在我想定义适当的错误处理程序,使消费者停留在失败的记录上而不是轮询下一批。

我应该使用哪个错误处理程序?

谢谢。

【问题讨论】:

    标签: spring spring-kafka


    【解决方案1】:

    RecoveringBatchErrorHandler (https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh) 现在是首选(自 2.5 起为默认)批处理错误处理程序。您的侦听器可以抛出特定异常来指示批处理中的哪条记录失败。

    还有一个RetryingBatchErrorHandlerhttps://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh)。

    【讨论】:

    • 谢谢,RecoveringBatchErrorHandler 看起来是个好主意。还有一件事——如果我想让消费者陷入失败记录的无限循环,我该如何实现呢?通过设置 FixedBackOff.UNLIMITED_ATTEMPTS?
    • 只是为了确定——只有失败记录的分区才会进入循环,消费者会像往常一样继续从其他分区轮询。对吗?
    • 正确;只要你抛出一个BatchListenerFailedException 指示批处理中的哪条记录失败。该批次之前的所有记录(来自任何分区)都将提交其偏移量。失败的记录和任何剩余的记录将导致从每个分区中第一个未处理的记录重播查找操作。
    猜你喜欢
    • 2021-08-31
    • 1970-01-01
    • 2021-05-06
    • 2020-07-17
    • 2017-07-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-05
    相关资源
    最近更新 更多