【发布时间】:2017-01-06 14:47:41
【问题描述】:
我正在使用 Reactor,并且正在创建一个通量,我正在向其发布一些事件。 我的问题是我使用过滤器创建的订阅者会在一段时间后失败,除非我在通量上添加非过滤器订阅者。
import reactor.core.publisher.EmitterProcessor
class PublishSubscribe {
companion object {
@JvmStatic
fun main(args: Array<String>) {
val publisher = EmitterProcessor.create<String>().connect()
writeAndGet(publisher)
writeAndGet(publisher)
writeAndGet(publisher)
}
fun writeAndGet(publisher: EmitterProcessor<String>) {
val result = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
val result2 = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
Thread.sleep(1000)
publisher.onNext("unu")
publisher.onNext("end")
try {
println("X=" + result.blockMillis(3000))
println("Y=" + result2.blockMillis(3000))
} catch (e: Exception) {
e.printStackTrace()
}
println(result.isTerminated)
println(result2.isTerminated)
println("---")
}
}
}
如果我是一个额外的订阅者,该代码可以正常工作。
...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe() //this solves the issue
writeAndGet(publisher)
...
关于我做错了什么有什么想法吗?
最好的问候
【问题讨论】:
-
它是如何“失败”的?你的期望是什么,你观察到什么?在旁注中,使用
Processor通常应该是最后的手段......也许你可以找到一种方法来使用Flux(例如Flux.create)来做你想做的事情? -
它因 java.lang.IllegalStateException 失败:Mono 阻塞读取超时。 Yaroslav Stavnichiy 在下面解释了原因。
标签: project-reactor