【问题标题】:Reactor - Flux requires non filter subscriberReactor - Flux 需要非过滤器订阅者
【发布时间】:2017-01-06 14:47:41
【问题描述】:

我正在使用 Reactor,并且正在创建一个通量,我正在向其发布一些事件。 我的问题是我使用过滤器创建的订阅者会在一段时间后失败,除非我在通量上添加非过滤器订阅者。

import reactor.core.publisher.EmitterProcessor

class PublishSubscribe {

companion object {
    @JvmStatic
    fun main(args: Array<String>) {
        val publisher = EmitterProcessor.create<String>().connect()
        writeAndGet(publisher)
        writeAndGet(publisher)
        writeAndGet(publisher)

    }

    fun writeAndGet(publisher: EmitterProcessor<String>) {
        val result = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()

        val result2 = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()


        Thread.sleep(1000)

        publisher.onNext("unu")
        publisher.onNext("end")

        try {

            println("X=" + result.blockMillis(3000))
            println("Y=" + result2.blockMillis(3000))

        } catch (e: Exception) {
            e.printStackTrace()
        }
        println(result.isTerminated)
        println(result2.isTerminated)
        println("---")

    }

}

}

如果我是一个额外的订阅者,该代码可以正常工作。

...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe()  //this solves the issue
writeAndGet(publisher)
...

关于我做错了什么有什么想法吗?

最好的问候

【问题讨论】:

  • 它是如何“失败”的?你的期望是什么,你观察到什么?在旁注中,使用Processor 通常应该是最后的手段......也许你可以找到一种方法来使用Flux(例如Flux.create)来做你想做的事情?
  • 它因 java.lang.IllegalStateException 失败:Mono 阻塞读取超时。 Yaroslav Stavnichiy 在下面解释了原因。

标签: project-reactor


【解决方案1】:

EmitterProcessor.create() 创建处理器时将autoCancel 标志设置为true。这意味着一旦所有订阅者取消订阅,它就会自动取消。

让您的订阅者退订的不是filter,而是takeUntil 运营商。

添加一个额外的永久订阅者可以防止处理器自动取消,但这似乎不是一个好的解决方案。

要使您的测试用例正常工作,您必须使用EmitterProcessor.create(false) 创建处理器。这会将autoCancel 设置为false,因此您可以一次又一次地重新订阅。

【讨论】:

  • 很好的答案。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-08
  • 2021-12-19
相关资源
最近更新 更多