【问题标题】:How to manually acknoweldge offset after multiple commits多次提交后如何手动确认偏移量
【发布时间】:2019-09-10 10:00:28
【问题描述】:

我正在开发一个 Spring kafka 应用程序。我正在向它发送一批记录。我只想在所有记录发送后而不是在每条记录之后确认偏移量。

我已经尝试过下面的代码(从这里复制https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_usage_examples),但这会确认每条记录之后的偏移量,而不是记录批次。我先设置以下属性

spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment =     message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

批量记录后如何手动确认?

【问题讨论】:

    标签: spring apache-kafka spring-kafka


    【解决方案1】:

    如您所见here,您可以收到消息列表,因此您的示例如下所示:

    @StreamListener(Sink.INPUT)
     public void process(List<Message<?>> messages) {
         Acknowledgment acknowledgment =     message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
         if (acknowledgment != null) {
             System.out.println("Acknowledgment provided");
             acknowledgment.acknowledge();
         }
     }
    

    完成所有记录后,您可以致电acknowledgment.acknowledge();

    【讨论】:

      猜你喜欢
      • 2018-05-05
      • 1970-01-01
      • 1970-01-01
      • 2021-04-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-18
      相关资源
      最近更新 更多