【问题标题】:Pause message consumption from main kafka stream and start from other kafka topic暂停来自主 kafka 流的消息消费并从其他 kafka 主题开始
【发布时间】:2021-12-12 00:48:39
【问题描述】:

我正在使用@StreamListener(spring cloud stream)来消费来自主题(input-channel)的消息,进行一些处理并保存到某个缓存或数据库中。

我的要求是,如果 DB 在处理消费的消息时出现故障,我想暂停主要消费者(输入通道),并从另一个 TOPIC(INPUT56-CHANNEL)开始消费,并且一旦它消耗所有来自 INPUT56-CHANNEL 的消息(没有很多),我想再次恢复主要消费者(输入通道)。

这能实现吗??

【问题讨论】:

  • 您可以通过 Spring Integration 实现这一点,但我认为 Stream 没有足够复杂的路由来完成您所说的关闭。 (FWIW,听起来您正在尝试 circuit-breaker 模式的变体。)
  • 查看docs.spring.io/spring-cloud-stream/docs/3.1.4/reference/html/… 以控制绑定生命周期 - 并查看this answer 以了解如何检测所有辅助记录何时已被消耗。
  • 谢谢加里,会试试的
  • 嗨@GaryRussell你能分享任何例子吗,我对两件事感到困惑第一,二级消费者如何知道它需要在数据库关闭后立即开始消费(也许我可以使用一些切换或something), 2, 二级消费者消费完后, 一级消费者怎么知道, 消费完又恢复。暂停和恢复我正在使用“BindingsEndpoint”。效果很好。
  • 设置二级绑定为autostartup: false;暂停主绑定时,启动辅助绑定;当您收到一个空闲事件(或所有空闲事件,如果使用并发),停止辅助绑定并恢复主绑定。

标签: java spring-boot apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka


【解决方案1】:

@StreamListener 已弃用;你应该改用函数式编程模型。

这是一个使用该模型的示例(但相同的技术适用于已弃用的侦听器)。

spring.cloud.function.definition=input1;input2

spring.cloud.stream.bindings.input1-in-0.group=grp1
spring.cloud.stream.bindings.input2-in-0.consumer.auto-startup=false
spring.cloud.stream.bindings.input2-in-0.group=grp2

spring.cloud.stream.kafka.bindings.input2-in-0.consumer.idle-event-interval=5000
@SpringBootApplication
public class So69726610Application {

    public static void main(String[] args) {
        SpringApplication.run(So69726610Application.class, args);
    }

    boolean dbIsDown = true;

    @Autowired
    BindingsLifecycleController controller;

    TaskExecutor exec = new SimpleAsyncTaskExecutor();

    @Bean
    public Consumer<String> input1() {
        return str -> {
            System.out.println(str);
            if (this.dbIsDown) {
                this.controller.changeState("input1-in-0", State.PAUSED);
                this.controller.changeState("input2-in-0", State.STARTED);
                throw new RuntimeException("Paused");
            }
        };
    }

    @Bean
    public Consumer<String> input2() {
        return System.out::println;
    }

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        System.out.println(event);
        // assumes concurrency = 1 (default)
        if (event.getListenerId().contains("input2-in-0")) {
            this.controller.changeState("input1-in-0", State.RESUMED);
            this.exec.execute(() -> this.controller.changeState("input2-in-0", State.STOPPED));
        }
    }

}

【讨论】:

  • 非常感谢 Gary,这正是我所需要的。我会测试一下。
  • 嗨 Gary...我有一个问题,在我的应用程序中,两个主题都在同一个组中,每次我以编程方式启动/停止辅助绑定时,它都会重新平衡组,我们可以避免这种情况吗无论如何都要重新平衡??
  • 出于这个原因,将两个听众放在同一个组中并不是一个好习惯。
  • Gary,还有一个疑问,当我暂停消费者时,我是否可以在不处理的情况下丢弃当前消息并在恢复时再次使用它,尽管我使用了 autoCommit
  • 是的,抛出异常(默认SeekToCurrentErrorHandler)会导致容器恢复后重新投递记录。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-07-09
  • 2022-10-23
  • 2016-08-27
  • 2017-12-23
  • 2018-07-21
  • 2020-12-12
  • 2020-03-05
相关资源
最近更新 更多