【问题标题】:Disable Flowable/Observable Buffer禁用 Flowable/Observable 缓冲区
【发布时间】:2017-07-11 07:39:04
【问题描述】:

我无法始终从使用 combineLatest 运算符获得的组合中获取最后一个值。

我有 2 个热流 (a, b) 生成 高频事件(每个事件每 100 毫秒一个事件):

Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);

与 combineLatest 相结合,这需要将近 300 毫秒才能完成。

Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB,        OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
                System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
                System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
                Thread.sleep(300);
            }

combiner 执行一次后,我想处理生成的事件的最后一个组合,意思是 (lastA, lastB)。

组合流的默认行为是将所有事件组合缓存在其自己的缓冲区中,以便组合流接收非常旧的组合,并且这个时间间隔正在爆炸。

我应该如何更改我的代码以禁用此缓冲区并始终接收最后一个组合?

【问题讨论】:

    标签: java java-8 rx-java reactive-programming rx-java2


    【解决方案1】:

    您可以在flowAflowB 上应用onBackpressureLatest,并使用overload of combineLatest 指定预取量。

    Flowable.combineLatest(
        Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
        a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
        1
    )
    .onBackpressureLatest()
    .observeOn(Schedulers.newThread(), false, 1)
    

    不幸的是,没有同时采用 BiFunction 和 bufferSize 的重载,因此您必须恢复为转换数组元素。

    编辑

    应用第二个onBackpressureLatest 并限制observeOn 上的缓冲区大小应该可以让您更接近所需的模式,尽管combineLatest 并非针对此用例。您可能需要一些多样本运算符。

    【讨论】:

    • 我在这里用一个简化的例子尝试了你的解决方案,似乎还有缓冲。代码可在此处获得:link
    • 更新了我的答案。
    • 这是有效的,我相应地更新了要点。谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-04
    • 2023-04-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多