【发布时间】:2019-09-10 10:00:28
【问题描述】:
我正在开发一个 Spring kafka 应用程序。我正在向它发送一批记录。我只想在所有记录发送后而不是在每条记录之后确认偏移量。
我已经尝试过下面的代码(从这里复制https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_usage_examples),但这会确认每条记录之后的偏移量,而不是记录批次。我先设置以下属性
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
批量记录后如何手动确认?
【问题讨论】:
标签: spring apache-kafka spring-kafka