【问题标题】:Stop consume message for Stream listener停止使用 Stream 侦听器的消息
【发布时间】:2020-03-06 18:36:45
【问题描述】:

我正在寻找一种方法来停止使用流侦听器消费消息。

@StreamListener(MBinding.M_INPUT)
    public void consumeMessage(Message<MerchantEvent> message) {
    //handle when receive message
 }

cloud:
        stream:
            bindings:
                MInput:
                    destination: topicName
                    group: groupName

我已经用谷歌搜索过了,但现在仍然不知道如何停止消费。有谁知道吗?

【问题讨论】:

    标签: spring-boot spring-kafka


    【解决方案1】:

    您可以使用执行器执行此操作(请参阅Binding Visualization and Control)。或者您可以通过编程方式调用端点。

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So58795176Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So58795176Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(String in) {
            System.out.println();
        }
    
        @Autowired
        BindingsEndpoint endpoint;
    
        @Bean
        public ApplicationRunner runner() {
            return args -> {
                System.in.read();
                endpoint.changeState("input", State.STOPPED);
                System.in.read();
                endpoint.changeState("input", State.STARTED);
            };
        }
    
    }
    

    【讨论】:

    • 我检查了绑定端点,当我将绑定状态更改为 PAUSE 时,消费者仍然在消费消息本身,所以现在我对绑定的含义感到困惑。我只是看了几天卡夫卡,所以不清楚。您能告诉我这种情况下的绑定是什么意思吗?
    • 暂停/停止在之前的poll()检索到的记录被处理后才会生效。如果要立即暂停/停止,可以设置max.poll.records=1
    • 上一条消息已经处理完毕,max.poll.records 设置为1,然后我执行绑定端点,状态变为PAUSED,但消费者仍在消费消息
    • 我在检查错误的类时出错了,绑定端点运行良好,非常感谢:D
    • @GaryRussell State.STOPPED 状态是私有的。如何使用它?其次,我们是否需要打开执行器才能使上面的代码工作?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-10
    • 1970-01-01
    • 1970-01-01
    • 2019-06-17
    相关资源
    最近更新 更多