【发布时间】:2021-02-26 02:02:50
【问题描述】:
对于Flux,您可以指定订阅时发生的自定义操作。例如Flux.create(emitter -> someApi.setCallback(emitter::next)) 会在订阅时设置一些 API 挂钩。
我们如何为接收器提供这样的自定义订阅操作?例如。 Sinks.unsafe().many().unicast().onBackpressureBuffer(someAction)?
我已经设法使用Flux.concat(Mono.fromRunnable(someAction), sink) 使其工作,但我想这会增加不必要的开销,因此并不理想。
【问题讨论】:
-
有一个
doOnSubscribe运算符。这符合你的目的吗?sink.asFlux().doOnSubscribe(...) -
@MartinTarjányi 这正是我想要的,谢谢——如果你发帖作为答案,我会接受
标签: java reactive-programming project-reactor