【发布时间】:2015-12-29 17:47:41
【问题描述】:
我在单个源的接收器会来来去去的上下文中使用 Akka 流。出于这个原因,我从一个来源创建一个发布者,并在需要时附加订阅者:
val publisher= mySource.runWith(Sink.publisher(true))
与
publisher.subscribe(subscriber1)// There will be others
某些订阅者会比其他订阅者更快,我希望让较快的订阅者独立于最慢的订阅者继续前进,至少在发布者的输入缓冲区允许的范围内。该缓冲区由 Sink.publisher(true) 方法的注释描述:
如果
fanout是true,则物化的Publisher将支持多个Subscribers,并且为此阶段配置的inputBuffer的大小成为最快[[org.reactivestreams .Subscriber]] 可以在由于背压而减慢处理速度之前领先于最慢的一个。
我的问题是我不知道如何“为此阶段”设置这个 inputBuffer 值。我见过的最接近的描述在this article 的丢弃广播部分,但这似乎坚持使用流 DSL。我相信我不能使用 DSL,因为我需要不断地附加新的订阅者。
因此,最慢的订阅者会阻碍我的整体流率。我正在尝试做的一个相关方面涉及确保不同的订阅者在不同的线程上运行(不创建明确的参与者作为订阅者)。
【问题讨论】: