【发布时间】:2016-02-10 02:20:54
【问题描述】:
熟悉lmax ring buffer (disruptor) 的人都知道,这种数据结构的最大优势之一是它可以批处理传入的事件,并且当我们有一个消费者可以利用批处理使系统自动适应负载时,你投入的事件越多越好。
我想知道我们不能使用 Observable 实现相同的效果(针对批处理功能)。我已经尝试过Observable.buffer,但这是非常不同的,缓冲区将等待并且不会发出批处理,而预期数量的事件没有到达。我们想要的完全不同。
鉴于订阅者正在等待来自Observable<Collection<Event>> 的批处理,当单个项目到达流时,它会发出一个由订阅者处理的单个元素批处理,同时它正在处理其他元素到达并被收集到下一个批处理中,一旦订阅者完成执行,它就会获得下一批,其中包含自上次处理以来到达的事件数量......
因此,如果我们的订阅者足够快地一次处理一个事件,它就会这样做,如果负载变高,它仍然会有相同的处理频率,但每次处理的事件更多(从而解决背压问题)。 .. 不像缓冲区会粘住并等待批次填满。
有什么建议吗?还是我应该使用环形缓冲区?
【问题讨论】:
-
我相信这个操作符叫做
bufferIntrospective,见stackoverflow.com/questions/28880247/…
标签: java reactive-programming rx-java observable disruptor-pattern