【问题标题】:Spring-Cloud-Streams Kafka - How to stop the consumersSpring-Cloud-Streams Kafka - 如何阻止消费者
【发布时间】:2019-08-27 03:35:19
【问题描述】:

我有一个 Spring-Cloud-Streams 客户端读取由多个分区组成的 Kakfa 主题。客户端为它读取的每条 Kafka 消息调用一个 Web 服务。如果重试几次后 web 服务不可用,我想阻止消费者从 Kafka 读取。参考之前的 Stackoverflow 问题 (Spring cloud stream kafka pause/resume binders),我自动连接了 BindingsEndpoint 并调用 changeState() 方法来尝试停止消费者,但日志显示消费者继续阅读调用 changeState() 后来自 Kafka 的消息。

我正在使用 Spring Boot 版本 2.1.2.RELEASE 和 Spring Cloud 版本 Greenwich.RELEASE。 spring-cloud-stream-binder-kafka 的托管版本是 2.1.0.RELEASE。我已设置属性 autoCommitOffset=true 和 autoCommitOnError=false。

下面是我的代码的 sn-p。有什么我错过的吗? changeState() 的第一个输入参数应该是主题名称吗?

如果我希望消费者应用程序在 web 服务不可用时退出,我可以简单地执行 System.exit() 而不需要先停止消费者吗?

@Autowired
private BindingsEndpoint bindingsEndpoint;

...
...
@StreamListener(MyInterface.INPUT)  
    public void read(@Payload MyDTO dto,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        try {

            logger.info("Processing message "+dto);
            process(dto); // this is the method that calls the webservice


        } catch (Exception e) {

            if (e instanceof IllegalStateException || e instanceof ConnectException) {

                bindingsEndpoint.changeState("my.topic.name", 
                    BindingsEndpoint.State.STOPPED);    
                // Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object                     
            }

                e.printStackTrace();
                throw (e);
  }
}

【问题讨论】:

    标签: spring-boot apache-kafka spring-cloud-stream


    【解决方案1】:

    有同样的问题,changeState() 的第一个输入参数应该是绑定名称。它对我有用

    【讨论】:

      【解决方案2】:

      您可以通过使用Binding visualization and control 功能来实现此目的,您可以在其中可视化以及停止/启动/暂停/恢复绑定。

      另外,你知道System.exit() 会关闭整个JVM吗?

      【讨论】:

      • 停止消费者只有在最后一个poll()收到的所有消息都处理完后才会生效。如果你想让它立即停止,你需要将kafka消费者属性max.poll.records设置为1
      猜你喜欢
      • 1970-01-01
      • 2022-01-11
      • 2016-06-21
      • 2017-06-22
      • 1970-01-01
      • 1970-01-01
      • 2020-12-30
      • 2018-02-18
      • 1970-01-01
      相关资源
      最近更新 更多