【发布时间】:2016-04-11 17:53:27
【问题描述】:
我想创建一个可观察对象,它仅在新值与前一个值不同时才从底层热可观察对象发出值(从 -1 开始)。此外,我希望将最新值立即发送给新订阅者。我想出了以下代码:
PublishSubject<Integer> hotObservable = PublishSubject.create();
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(0);
但是,在使用java.lang.IllegalStateException: more produced than requested 向新订阅者发送第一个值(始终为-1,无论hotObservable 在订阅observable 之前发出什么)后,此操作会失败
有趣的是,当我没有自动连接,而是手动订阅时:
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect();
observable.subscribe().unsubscribe();
以下订阅者正常工作,接收最后一个值,然后更新。
我无法让replay(1).autoConnect(0) 工作,我觉得我错过了一些东西 - 为什么订阅和取消订阅会起作用,而 autoConnect(0) 不会?创建这种可观察对象的正确方法是什么?
除非我使用autoConnect(); observable.subscribe().unsubscribe(),否则这是失败的测试方法:
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(); // With (0) it fails
observable.subscribe().unsubscribe(); // Needed if we don't auto connnect
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3); // I want this value to be received by new subscriber
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValues(3);
【问题讨论】:
标签: java rx-java reactive-programming