【发布时间】:2020-01-13 22:47:10
【问题描述】:
下面是我的一个组件的代码,它启动一个Flux 并订阅它,所有这些都在类的构造函数中。这个特定的通量来自 mongoChangeStreams 调用。除非出现错误,否则它不会终止。
我希望订阅始终保持活动状态,因此我会在事件因错误而终止时重新启动订阅。
在我看来,在构造函数中调用 subscribe 可能是个坏主意。此外,我可能应该启用一种方法来优雅地关闭这个应用程序,方法是在关闭期间调用取消订阅。
我的猜测是我应该实现SmartLifeCycle,但我不知道该怎么做。是否有在 Flux 订阅支持的组件上实现 SmartLifeCycle 的标准方法?
@Component
class SubscriptionManager(
private val fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
private val fooProcessor: FooProcessor
) {
private var subscription: BaseSubscriber<Foo> = subscribe() //called in constructor
private fun subscribe() = buildSubscriber().also {
fooFluxProvider.foos().subscribe(it)
}
private fun buildSubscriber(): BaseSubscriber<Foo> {
return object : BaseSubscriber<Foo>() {
override fun hookOnSubscribe(subscription: Subscription) {
subscription.request(1)
}
override fun hookOnNext(value: Foo) {
//process the foo
fooProcessor.process(value)//sync call
//ask for another foo
request(1)
}
override fun hookOnError(throwable: Throwable) {
logger.error("Something went wrong, restarting subscription", throwable)
//restart the subscription. We'll recover if we're lucky
subscription = subscribe()
}
}
}
}
【问题讨论】:
标签: spring kotlin project-reactor