【问题标题】:"More produced than requested" exception when using `replay` and `autoConnect`使用 `replay` 和 `autoConnect` 时出现“多于请求”异常
【发布时间】:2016-04-11 17:53:27
【问题描述】:

我想创建一个可观察对象,它仅在新值与前一个值不同时才从底层热可观察对象发出值(从 -1 开始)。此外,我希望将最新值立即发送给新订阅者。我想出了以下代码:

PublishSubject<Integer> hotObservable = PublishSubject.create();

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect(0);

但是,在使用java.lang.IllegalStateException: more produced than requested 向新订阅者发送第一个值(始终为-1,无论hotObservable 在订阅observable 之前发出什么)后,此操作会失败 有趣的是,当我没有自动连接,而是手动订阅时:

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect();

observable.subscribe().unsubscribe();

以下订阅者正常工作,接收最后一个值,然后更新。

我无法让replay(1).autoConnect(0) 工作,我觉得我错过了一些东西 - 为什么订阅和取消订阅会起作用,而 autoConnect(0) 不会?创建这种可观察对象的正确方法是什么?

除非我使用autoConnect(); observable.subscribe().unsubscribe(),否则这是失败的测试方法:

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect(); // With (0) it fails

observable.subscribe().unsubscribe(); // Needed if we don't auto connnect

hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3); // I want this value to be received by new subscriber

TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);

subscriber.assertNoErrors();
subscriber.assertValues(3);

【问题讨论】:

    标签: java rx-java reactive-programming


    【解决方案1】:

    上面的代码在 RxJava 1.1.3 上没有出现More produced than requested 错误。

    断言失败的原因是replay 不会从上游请求任何东西,直到它的任何订阅者实际请求。如果TestSubscriber是第一个订阅的,它会触发startWith发出-1,然后切换到不保留任何值的PublishSubject,因此您不会收到任何其他内容。

    我相信您正在寻找的是BehaviorSubject,它保留了最后一个值并从新订阅者开始:

    BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1);
    
    Observable<Integer> observable = hotObservable.distinctUntilChanged();
    
    hotObservable.onNext(1);
    hotObservable.onNext(2);
    hotObservable.onNext(3);
    
    TestSubscriber<Integer> subscriber = TestSubscriber.create();
    observable.subscribe(subscriber);
    
    subscriber.assertNoErrors();
    subscriber.assertValue(3);
    

    【讨论】:

    • 可以确认版本1.1.3工作!我之前在1.1.2。至于BehaviorSubject - 这很有趣,但是hotObservable 实际上是在其他地方创建的。 BehaviorSubject b = BehaviorSubject.create(-1); hotObservable.subscribe(b); return b.distinctUntilChanged() 也是有效的方法吗?
    • 是的,这是一种有效的方法。您可能需要添加 onBackpressureBufferonBackpressureDrop 以防 hotObservable 信号值过快。
    • 我添加了onBackpressureLatest()。非常感谢您的帮助!
    猜你喜欢
    • 1970-01-01
    • 2019-07-23
    • 2018-01-07
    • 2014-10-03
    • 2018-10-19
    • 2011-07-15
    • 1970-01-01
    • 2022-11-08
    • 1970-01-01
    相关资源
    最近更新 更多