【发布时间】:2018-01-04 22:34:14
【问题描述】:
我有一个基于微服务的应用程序,它从 Kafka 主题中读取消息。当服务关闭时,如果有任何消息写入主题,我希望消费者在下次启动并运行时读取这些消息。但是当服务关闭时,我错过了所有消息。如何让消费者阅读服务关闭时未阅读的消息?
当我的微服务启动并且任何消息都返回到主题时,我会收到所有消息。
我的 application.properties:
spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.startOffset=latest
spring.cloud.stream.bindings.input.consumer.resetOffsets=true
spring.cloud.stream.bindings.input.consumer.instanceCount=3
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false
//这是我的微服务根目录下的消费者代码
@EnableBinding(Sink.class)
public class Consumer {
@ServiceActivator(inputChannel = Sink.INPUT)
public void consoleSink(Object payload){
logger.info("Type: "+ payload.getClass() + " which is byte array");
logger.info( "Payload: " + new String((byte[])payload));
} }
感谢任何解决此问题的线索。
【问题讨论】:
标签: spring microservices spring-cloud