【问题标题】:Spring Kafka @KafkaListener - Retry sending failed messages and manually commit the offsetSpring Kafka @KafkaListener - 重试发送失败消息并手动提交偏移量
【发布时间】:2019-07-05 21:15:54
【问题描述】:

我正在使用 @KafkaListener 从 kafka 主题中消费,我有一个应用程序逻辑来处理不同消费者组中的多个消费者的每条记录。

现在我的问题陈述是-我必须将消费的消息发送到第三方休息端点。 如果消息发送失败到 rest-endpoint,我不应该提交偏移量,需要根据可配置的次数重试发送消息。 可配置重试次数后,如果发送消息失败。我需要记录异常的原因并提交偏移量。

我正在使用 spring kafka 2.2.0。以下是我的手动确认配置

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

spring.kafka.listener.ack-mode=manual-immediate

logging.level.org.springframework.kafka=debug

到目前为止,我可以使用不同消费者发布到主题中的消息。并且还能够手动确认来自不同消费者的偏移量。例如

@KafkaListener(topics = "testTopic",groupId="group-1")
    public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
        logger.info("OF PayLoad:{}", record);
        logger.info("OF value:"+record.value());
        logger.info("OF Offset:{}",record.offset());
        ack.acknowledge();

    }
    @KafkaListener(topics = "testTopic",groupId="group-2")
    public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
        logger.info("MF PayLoad_2:{}", record);
        logger.info("MF value_2:"+record.value());
        logger.info("MF Offset_2:{}",record.offset());
        //ack.acknowledge();
    }

这里,consumerTwo 没有确认消息。我想根据可配置的时间重新尝试将所有消息再次发送给消费者。

任何示例或建议都会有很大帮助。

【问题讨论】:

  • 你应该至少展示你到目前为止所拥有的东西。该框架具有许多功能,但这并不意味着我们必须将答案作为所有自定义用例的示例。
  • 您好,Artem,感谢您的回复。我正在编辑我的问题,以包括我能够达到的目标。

标签: spring-boot apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

this answer

在 2.2 中,我们添加了向 SeekToCurrentErrorHandler 添加恢复器的功能,它可以采取一些行动。

我最后评论中的拉取请求显示了如果恢复者成功恢复了消息,如何提交偏移量。这将在周四的 2.2.4 版本中发布。

在您的情况下,恢复器可以调用 REST 服务并在失败时抛出异常。

【讨论】:

    猜你喜欢
    • 2020-08-08
    • 1970-01-01
    • 2022-10-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多