【问题标题】:Implementing smartLifeCycle with a reactor subscription使用 reactor 订阅实现 smartLifeCycle
【发布时间】:2020-01-13 22:47:10
【问题描述】:

下面是我的一个组件的代码,它启动一个Flux 并订阅它,所有这些都在类的构造函数中。这个特定的通量来自 mongoChangeStreams 调用。除非出现错误,否则它不会终止。

我希望订阅始终保持活动状态,因此我会在事件因错误而终止时重新启动订阅。

在我看来,在构造函数中调用 subscribe 可能是个坏主意。此外,我可能应该启用一种方法来优雅地关闭这个应用程序,方法是在关闭期间调用取消订阅。

我的猜测是我应该实现SmartLifeCycle,但我不知道该怎么做。是否有在 Flux 订阅支持的组件上实现 SmartLifeCycle 的标准方法?

@Component
class SubscriptionManager(
    private val fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
    private val fooProcessor: FooProcessor
)  {

    private var subscription: BaseSubscriber<Foo> = subscribe() //called in constructor

    private fun subscribe() = buildSubscriber().also {
        fooFluxProvider.foos().subscribe(it)
    }

    private fun buildSubscriber(): BaseSubscriber<Foo> {
        return object : BaseSubscriber<Foo>() {
            override fun hookOnSubscribe(subscription: Subscription) {

                subscription.request(1)
            }

            override fun hookOnNext(value: Foo) {
                //process the foo
                fooProcessor.process(value)//sync call
                //ask for another foo
                request(1)
            }

            override fun hookOnError(throwable: Throwable) {
                logger.error("Something went wrong, restarting subscription", throwable)
                //restart the subscription. We'll recover if we're lucky
               subscription = subscribe()
            }
        }
    }




}

【问题讨论】:

    标签: spring kotlin project-reactor


    【解决方案1】:
    1. 与其创建一个在异常时重新订阅的订阅者子类,不如在订阅之前在 Flux 上链接 retry* 运算符之一。如果它以异常完成,重试运算符将重新订阅上游 Flux。例如,fooFluxProvider.foos().retry() 将无限期地重试。 retry* 的其他变体可用于更高级的行为,包括可与reactor-extra 中的reactor.retry.Retry 类一起使用的高度可定制的retryWhen
    2. 不要将订阅者传递给subscribe(subscriber),而是调用返回Disposablesubscribe 方法之一。这为您提供了一个对象,您可以稍后在关机期间调用 dispose() 以取消订阅。
    3. 实现SmartLifecycle
      • 在构造函数(或start())中,创建Flux(但不要在构造函数中订阅)
      • start()中,调用flux.subscribe()并将返回的Disposable保存到成员字段中。 start() 方法比构造函数更适合启动后台作业。如果您希望它在后台运行,还可以考虑在.subscribe() 之前链接.subscribeOn(Scheduler)(默认情况下,订阅发生在调用subscribe 的线程上)。
      • stop(),拨打disposable.dispose()

    大概是这样的:

    class SubscriptionManager(
            fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
            fooProcessor: FooProcessor
    ) : SmartLifecycle {
        private val logger = LoggerFactory.getLogger(javaClass)
    
        private val fooFlux = fooFluxProvider.foos()
                // Subscribe on a parallel scheduler to run in the background
                .subscribeOn(Schedulers.parallel())
                // Publish on a boundedElastic scheduler if fooProcessor.process blocks
                .publishOn(Schedulers.boundedElastic())
                // Use .doOnNext to send the foo to your processor
                // Alternatively use .flatMap/.concatMap/.flatMapSequential if the processor returns a Publisher
                // Alternatively use .map if the processor transforms the foo, and you need to operate on the returned value
                .doOnNext(fooProcessor::process)
                // Log if an exception occurred
                .doOnError{ e -> logger.error("Something went wrong, restarting subscription", e) }
                // Resubscribe if an exception occurred
                .retry()
                // Repeat if you want to resubscribe if the upstream flux ever completes successfully
                .repeat()
    
        private var disposable: Disposable? = null
    
        @Synchronized
        override fun start() {
            if (!isRunning) {
                disposable = fooFlux.subscribe()
            }
        }
    
        @Synchronized
        override fun stop() {
            disposable?.dispose()
            disposable = null
        }
    
        @Synchronized
        override fun isRunning(): Boolean {
            return disposable != null
        }
    
    }
    

    【讨论】:

    • 谢谢!这非常有帮助,让我看到了我没有充分考虑的线程问题。关于第 2 点:subscribe 方法的参数是一个 BaseSubscriber,它已经实现了 Disposable
    猜你喜欢
    • 2020-04-01
    • 2014-09-03
    • 1970-01-01
    • 2018-12-09
    • 1970-01-01
    • 1970-01-01
    • 2021-04-11
    • 1970-01-01
    • 2019-10-05
    相关资源
    最近更新 更多