【发布时间】:2016-10-19 18:11:50
【问题描述】:
我有一个有多个订阅者的可连接观察者。
每个订阅者都会计算一些业务逻辑。例如,一个订阅者在每次onNext 调用时将结果存储在数据库中,其他订阅者将其结果累积到内存中,当onCompleted 调用时将它们写入文件。我想知道他们什么时候都完成了工作,所以我可以继续做其他事情(与其他可连接的观察者聚合,从数据库中读取输出数据等)。
这就是我观察终止的方式。它之所以有效,是因为订阅者与观察者在同一个线程中执行。
public Observable<Boolean> observeTermination() {
return Observable.defer(() -> {
try {
start();
return Observable.just(true);
} catch (RuntimeException e) {
return Observable.just(false);
}
});
}
void start() {
Observable<List<Foo>> fooBatchReaderObservable = fooBatchReader.createObservable(BATCH_SIZE);
ConnectableObservable<List<Foo>> connectableObservable = fooBatchReaderObservable.publish();
subscribers.forEach(s -> connectableObservable.subscribe(s));
connectableObservable.connect();
}
所以当observeTermination 被调用时,我不想在start 方法中执行逻辑,但只有当有人订阅它时。
有没有办法让观察变得更好?
好吧,这一切都很糟糕。问题是我需要在可观察到的某处调用connect 并返回boolean 结果作为终止的提示。
【问题讨论】:
标签: java rx-java reactive-programming