【发布时间】:2021-12-12 00:48:39
【问题描述】:
我正在使用@StreamListener(spring cloud stream)来消费来自主题(input-channel)的消息,进行一些处理并保存到某个缓存或数据库中。
我的要求是,如果 DB 在处理消费的消息时出现故障,我想暂停主要消费者(输入通道),并从另一个 TOPIC(INPUT56-CHANNEL)开始消费,并且一旦它消耗所有来自 INPUT56-CHANNEL 的消息(没有很多),我想再次恢复主要消费者(输入通道)。
这能实现吗??
【问题讨论】:
-
您可以通过 Spring Integration 实现这一点,但我认为 Stream 没有足够复杂的路由来完成您所说的关闭。 (FWIW,听起来您正在尝试 circuit-breaker 模式的变体。)
-
查看docs.spring.io/spring-cloud-stream/docs/3.1.4/reference/html/… 以控制绑定生命周期 - 并查看this answer 以了解如何检测所有辅助记录何时已被消耗。
-
谢谢加里,会试试的
-
嗨@GaryRussell你能分享任何例子吗,我对两件事感到困惑第一,二级消费者如何知道它需要在数据库关闭后立即开始消费(也许我可以使用一些切换或something), 2, 二级消费者消费完后, 一级消费者怎么知道, 消费完又恢复。暂停和恢复我正在使用“BindingsEndpoint”。效果很好。
-
设置二级绑定为
autostartup: false;暂停主绑定时,启动辅助绑定;当您收到一个空闲事件(或所有空闲事件,如果使用并发),停止辅助绑定并恢复主绑定。
标签: java spring-boot apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka