【问题标题】:Spring Cloud Stream with Kafka - message not being read after restarting the consumerSpring Cloud Stream with Kafka - 重新启动消费者后未读取消息
【发布时间】:2018-01-04 22:34:14
【问题描述】:

我有一个基于微服务的应用程序,它从 Kafka 主题中读取消息。当服务关闭时,如果有任何消息写入主题,我希望消费者在下次启动并运行时读取这些消息。但是当服务关闭时,我错过了所有消息。如何让消费者阅读服务关闭时未阅读的消息?

当我的微服务启动并且任何消息都返回到主题时,我会收到所有消息。

我的 application.properties:

spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.startOffset=latest
spring.cloud.stream.bindings.input.consumer.resetOffsets=true
spring.cloud.stream.bindings.input.consumer.instanceCount=3
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false

//这是我的微服务根目录下的消费者代码

@EnableBinding(Sink.class) 
public class Consumer { 
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void consoleSink(Object payload){
        logger.info("Type: "+ payload.getClass() + " which is byte array");
        logger.info( "Payload: " + new String((byte[])payload));
    } }

感谢任何解决此问题的线索。

【问题讨论】:

    标签: spring microservices spring-cloud


    【解决方案1】:

    设置以下属性帮助我解决了我的问题。

    spring.cloud.stream.bindings.input.destination=test
    spring.cloud.stream.bindings.input.consumer.headerMode=raw
    spring.cloud.stream.bindings.input.consumer.startOffset=latest
    spring.cloud.stream.bindings.input.consumer.resetOffsets=true
    spring.cloud.stream.bindings.input.consumer.instanceCount=3
    spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false
    spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
    spring.cloud.stream.kafka.binder.autoCreateTopics=false
    spring.cloud.stream.bindings.input.group=testGroup50
    spring.cloud.stream.bindings.input.partitioned=false
    

    谢谢,

    BR

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-30
      • 2016-06-21
      • 2017-06-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-02
      • 2022-01-06
      相关资源
      最近更新 更多