【发布时间】:2019-07-05 21:15:54
【问题描述】:
我正在使用 @KafkaListener 从 kafka 主题中消费,我有一个应用程序逻辑来处理不同消费者组中的多个消费者的每条记录。
现在我的问题陈述是-我必须将消费的消息发送到第三方休息端点。 如果消息发送失败到 rest-endpoint,我不应该提交偏移量,需要根据可配置的次数重试发送消息。 可配置重试次数后,如果发送消息失败。我需要记录异常的原因并提交偏移量。
我正在使用 spring kafka 2.2.0。以下是我的手动确认配置
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual-immediate
logging.level.org.springframework.kafka=debug
到目前为止,我可以使用不同消费者发布到主题中的消息。并且还能够手动确认来自不同消费者的偏移量。例如
@KafkaListener(topics = "testTopic",groupId="group-1")
public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("OF PayLoad:{}", record);
logger.info("OF value:"+record.value());
logger.info("OF Offset:{}",record.offset());
ack.acknowledge();
}
@KafkaListener(topics = "testTopic",groupId="group-2")
public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("MF PayLoad_2:{}", record);
logger.info("MF value_2:"+record.value());
logger.info("MF Offset_2:{}",record.offset());
//ack.acknowledge();
}
这里,consumerTwo 没有确认消息。我想根据可配置的时间重新尝试将所有消息再次发送给消费者。
任何示例或建议都会有很大帮助。
【问题讨论】:
-
你应该至少展示你到目前为止所拥有的东西。该框架具有许多功能,但这并不意味着我们必须将答案作为所有自定义用例的示例。
-
您好,Artem,感谢您的回复。我正在编辑我的问题,以包括我能够达到的目标。
标签: spring-boot apache-kafka kafka-consumer-api spring-kafka