【问题标题】:Connectable Observer: observe termination of all subscribersConnectable Observer:观察所有订阅者的终止
【发布时间】:2016-10-19 18:11:50
【问题描述】:

我有一个有多个订阅者的可连接观察者。

每个订阅者都会计算一些业务逻辑。例如,一个订阅者在每次onNext 调用时将结果存储在数据库中,其他订阅者将其结果累积到内存中,当onCompleted 调用时将它们写入文件。我想知道他们什么时候都完成了工作,所以我可以继续做其他事情(与其他可连接的观察者聚合,从数据库中读取输出数据等)。

这就是我观察终止的方式。它之所以有效,是因为订阅者与观察者在同一个线程中执行。

public Observable<Boolean> observeTermination() {
    return Observable.defer(() -> {
        try {
            start();
            return Observable.just(true);
        } catch (RuntimeException e) {
            return Observable.just(false);
        }
    });
}

void start() {
    Observable<List<Foo>> fooBatchReaderObservable = fooBatchReader.createObservable(BATCH_SIZE);

    ConnectableObservable<List<Foo>> connectableObservable = fooBatchReaderObservable.publish();
    subscribers.forEach(s -> connectableObservable.subscribe(s));

    connectableObservable.connect();
}

所以当observeTermination 被调用时,我不想在start 方法中执行逻辑,但只有当有人订阅它时。 有没有办法让观察变得更好?

好吧,这一切都很糟糕。问题是我需要在可观察到的某处调用connect 并返回boolean 结果作为终止的提示。

【问题讨论】:

    标签: java rx-java reactive-programming


    【解决方案1】:

    不是一个正确的答案,但它需要空间来正确解释。如果你可以处理 Observables 而不是 Subscribers 会容易得多;这使您在编写它们时具有更大的灵活性;

    假设你有components:

    Collection<Function<Observable<T>, Observable<?>>> components:
    Observable<T> tObs = ... .publish().autoConnect(components.size());
    Observable
    .from(components)
    .flatMap(component -> component.apply(tObs))
    .ignoreElements()
    .doOnTerminate() // or .defaultIfEmpty(...), or .switchIfEmpty(...)
    .subscribe(...);
    

    事实上,我想说你根本不应该在这里订阅,只需创建 observable 并返回它,让它可用​​于你代码的其他部分的组合。

    【讨论】:

    • 我曾考虑将订阅者更改为将被调用以获取相同数据的普通函数,但这将它们的使用限制为仅onNext。所以没有办法执行留在onStart,onCompleted,onError中的订阅者的逻辑
    • 也许我可以观察到Subsriptions,它是isUnsubscribe 方法?有办法吗?编辑:但是connect方法还是有问题
    • 如果你让订阅者函数接受一个 observable 并根据输入生成另一个 observable 链,你可以处理错误,onNext,在它们中启动和完成。
    • 我玩过你提供的小样,你能帮我理解如何触发tObs发射物品吗?因为当我订阅Observable.from(components) 时,它不会被调用
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-26
    • 1970-01-01
    • 2015-02-24
    • 2018-06-11
    相关资源
    最近更新 更多