【问题标题】:Observable to batch like Lmax Disruptor可观察到像 Lmax Disruptor 这样的批处理
【发布时间】:2016-02-10 02:20:54
【问题描述】:

熟悉lmax ring buffer (disruptor) 的人都知道,这种数据结构的最大优势之一是它可以批处理传入的事件,并且当我们有一个消费者可以利用批处理使系统自动适应负载时,你投入的事件越多越好。

我想知道我们不能使用 Observable 实现相同的效果(针对批处理功能)。我已经尝试过Observable.buffer,但这是非常不同的,缓冲区将等待并且不会发出批处理,而预期数量的事件没有到达。我们想要的完全不同。

鉴于订阅者正在等待来自Observable<Collection<Event>> 的批处理,当单个项目到达流时,它会发出一个由订阅者处理的单个元素批处理,同时它正在处理其他元素到达并被收集到下一个批处理中,一旦订阅者完成执行,它就会获得下一批,其中包含自上次处理以来到达的事件数量......

因此,如果我们的订阅者足够快地一次处理一个事件,它就会这样做,如果负载变高,它仍然会有相同的处理频率,但每次处理的事件更多(从而解决背压问题)。 .. 不像缓冲区会粘住并等待批次填满。

有什么建议吗?还是我应该使用环形缓冲区?

【问题讨论】:

标签: java reactive-programming rx-java observable disruptor-pattern


【解决方案1】:

RxJava 和 Disruptor 代表了两种不同的编程方法。

我没有使用 Disruptor 的经验,但根据视频讨论,它基本上是一个大型缓冲区,生产者在其中像消防水管一样发出数据,而消费者则旋转/屈服/阻塞,直到数据可用。

另一方面,RxJava 的目标是非阻塞事件传递。我们也有环形缓冲区,特别是在 observeOn 中,它充当生产者和消费者之间的异步边界,但它们要小得多,我们通过应用协同例程方法来避免缓冲区溢出和缓冲区膨胀。协同程序归结为发送到您的回调的回调,因此您可以回调我们的回调以按照您的节奏向您发送一些数据。此类请求的频率决定了节奏。

有些数据源不支持此类合作流式传输,并且需要 onBackpressureXXX 运算符之一,如果下游请求不够快,它将缓冲/丢弃值。

如果您认为批量处理数据比一个一个处理更有效,您可以使用 buffer 运算符,它具有重载来指定缓冲区的持续时间:例如,您可以有 10 毫秒的值数据量,与在此期间到达的值数量无关。

通过请求频率控制批量大小很棘手,并且可能会产生无法预料的后果。问题通常是,如果您从批处理源request(n) 表示您可以处理 n 个元素,但源现在必须创建 n 个大小为 1 的缓冲区(因为类型是 Observable<List<T>>)。相反,如果没有调用请求,则操作员会缓冲数据,从而导致更长的缓冲区。这些行为会在处理过程中引入额外的开销,如果您真的能够跟上并且还必须将冷源变成消防软管(否则您所拥有的基本上就是buffer(1)),这本身现在可能导致缓冲区膨胀。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-12-21
    • 2013-11-23
    • 2013-05-28
    • 2016-02-16
    • 1970-01-01
    • 2012-05-07
    • 1970-01-01
    相关资源
    最近更新 更多