【问题标题】:Single Observable with Multiple Subscribers具有多个订阅者的单个 Observable
【发布时间】:2016-06-27 09:21:52
【问题描述】:

我有一个 Observable<<List<Foo>> getFoo(),它是从改造服务创建的,并且在调用 .getFoo() 方法,我需要与多个订阅者共享它。但是调用.share() 方法会导致重新执行网络调用。重播运算符也不起作用。我知道一个潜在的解决方案可能是.cache(),但我不知道为什么会导致这种行为。

// Create an instance of our GitHub API interface.
Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(API_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build();

// Create a call instance for looking up Retrofit contributors.
Observable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share();

Subscription subscription1 = testObservable
       .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors);
            }
         });

Subscription subscription2 = testObservable
        .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors + " -> 2");
            }
         });

subscription1.unsubscribe();
subscription2.unsubscribe();

上面的代码可以重现上述行为。可以调试一下,看到收到的Lists属于不同的MemoryAddress。

我也将 ConnectableObservables 视为一种潜在的解决方案,但这需要我随身携带原始的 observable,并且每次我想添加新的订阅者时都调用 .connect()

.share() 的这种行为在 Retrofit 1.9 之前运行良好。它停止在 Retrofit 2 - beta 上工作。我还没有使用几小时前发布的 Retrofit 2 发布版本对其进行测试。

编辑:2017 年 1 月 2 日

为了以后的读者,我写了一篇文章here详细解释这个案例!

【问题讨论】:

    标签: android retrofit rx-java


    【解决方案1】:

    在与 RxJava 开发人员 Dávid Karnok 联系后,我想对这里发生的事情提出一个完整的解释。

    share() 定义为publish().refCount(),即。 e.源Observable 首先由publish() 转换为ConnectableObservable,但不必“手动”调用connect(),该部分由refCount() 处理。特别是,refCount 在收到第一个订阅时,会在ConnectableObservable 上调用connect();然后,只要至少有一个订阅者,它将保持订阅状态;最后,当订阅者数量降至 0 时,它将向上取消订阅。使用 cold Observables,就像 Retrofit 返回的那样,这将停止任何正在运行的计算。

    如果在这些周期之一之后出现另一个订阅者,refCount 将再次调用connect,从而触发对源 Observable 的新订阅。在这种情况下,它会触发另一个网络请求。

    现在,这通常不会在 Retrofit 1(实际上是 this commit 之前的任何版本)中变得明显,因为这些旧版本的 Retrofit 默认将所有网络请求移动到另一个线程。这通常意味着您的所有subscribe() 调用将在第一个请求/Observable 仍在运行时发生,因此新的Subscribers 将简单地添加到refCount,因此不会触发其他请求/@987654340 @。

    然而,较新版本的 Retrofit 默认情况下不再将工作转移到另一个线程 - 您必须通过调用(例如,subscribeOn(Schedulers.io()))明确地执行此操作。如果您不这样做,所有内容都将停留在当前线程上,这意味着第二个subscribe() 只会在第一个Observable 调用onCompleted 之后发生,因此毕竟Subscribers 已取消订阅并且所有内容都已关闭.现在,正如我们在第一段中看到的,当第二个 subscribe() 被调用时,share() 不得不向源 Observable 引发另一个 Subscription 并触发另一个网络请求。

    所以,要回到从 Retrofit 1 开始的习惯行为,只需添加 subscribeOn(Schedulers.io())

    这应该只执行网络请求 - 大多数时候。但原则上,您仍然可以收到多个请求(并且您总是可以使用 Retrofit 1),但前提是您的网络请求非常快和/或 subscribe() 调用发生相当大的延迟,所以再次,第一个当第二个subscribe() 发生时,请求就完成了。

    因此,Dávid 建议使用cache()(但它有你提到的缺点)或replay().autoConnect()。根据这些release notesautoConnect 的作用就像refCount 的前半部分,或者更准确地说,它是

    与 refCount() 的行为类似,只是它不会断开连接 当订阅者丢失时。

    这意味着该请求只会在第一个 subscribe() 发生时触发,但随后的所有 Subscribers 将接收所有发出的项目,无论其间的任何时间是否有 0 个订阅者。

    【讨论】:

    • 感谢您的详细解释。一定会记住的,尽管我现在很确定我已经为这个问题提供了一些舒适的解决方案:)
    • 太棒了!我只是想添加这个解释,因为replaysharepublish 等已经足够复杂了,对边缘情况进行详细解释并没有什么坏处。
    • 确实很好的答案:)
    【解决方案2】:

    您似乎(隐式)将.share() 返回的ConnectedObservable 转换回普通的Observable。您可能想了解冷热可观察物之间的区别。

    试试

    ConnectedObservable<List<Contributor>> testObservable = retrofit
            .create(GitHub.class)
            .contributors("square", "retrofit")
            .share();
    
    Subscription subscription1 = testObservable
       .subscribe(new Subscriber<List<Contributor>>() {
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable throwable) {
    
        }
    
        @Override
        public void onNext(List<Contributor> contributors) {
            System.out.println(contributors);
        }
    });
    
    Subscription subscription2 = testObservable
            .subscribe(new Subscriber<List<Contributor>>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
    
                @Override
                public void onNext(List<Contributor> contributors) {
                    System.out.println(contributors + " -> 2");
                }
            });
    
    testObservable.connect();
    subscription1.unsubscribe();
    subscription2.unsubscribe();
    

    编辑:您不需要每次想要新订阅时都调用connect(),您只需要它来启动可观察对象。我想您可以使用replay() 来确保所有后续订阅者都能获得所有产品

    ConnectedObservable<List<Contributor>> testObservable = retrofit
            .create(GitHub.class)
            .contributors("square", "retrofit")
            .share()
            .replay()
    

    【讨论】:

    • 感谢您的回答。问题是我真的想避免每次都调用 connect 。您确定重播运算符可以正常使用此用例吗?
    • 实际上我测试了它并且它有效。感谢您的时间和您的回复。只是为了这个问题,我已经阅读了热和冷可观察对象之间的区别,但无法通过使用 Retrofit 的网络调用来重现它。如果我使用 Observable.just() ,则共享运算符工作得很好。
    猜你喜欢
    • 2017-01-30
    • 1970-01-01
    • 2019-10-15
    • 1970-01-01
    • 2017-03-02
    • 1970-01-01
    • 2019-09-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多