【问题标题】:How to subscribe again to Observable without start new process?如何在不启动新进程的情况下再次订阅 Observable?
【发布时间】:2015-12-22 12:29:00
【问题描述】:

我有 Observable

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            if (subscriber.isUnsubscribed()) {
                return;
            }

            for (int i = 0; i < 100; i++) {
               Thread.sleep(100);
              subscriber.onNext("Loading:"+i);
            }
            subscriber.onCompleted();
        }
    });

订阅者

 Subscriber<? super String> sub = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    pDialog.setMessage("Successfully Done!");
                    pDialog.cancel();
                }

            @Override
            public void onError(Throwable e) {
                pDialog.cancel();
            }

            @Override
            public void onNext(String string) {
                pDialog.setMessage(string);
            }
        };

点击按钮后我会这样做:

  compositeSubscription.add(observable.subscribeOn(Schedulers.newThread()).observeOn(
            AndroidSchedulers.mainThread()).subscribe(sub));

活动中:

@Override
protected void onStart() {
    super.onStart();
   compositeSubscription = new CompositeSubscription();
}

 @Override
    protected void onStop() {
        if (!compositeSubscription.isUnsubscribed()&&compositeSubscription.hasSubscriptions())
            compositeSubscription.unsubscribe();
        super.onStop();
    }

单击按钮后,将显示对话框并且消息正在更新。

但是如果我最小化应用程序并再次打开它,订阅会丢失并且消息不会更新,但进程会继续执行。

如何在不启动新进程的情况下再次订阅 Observable?

【问题讨论】:

  • 这里明显的陌生人是Thread.sleep。我现在帮不了你,但是你能用像Observable.interval()Observable.take()这样的异步操作符重写吗?如果没有答案,我稍后再看。
  • @ReutSharabani Thread.sleep - 添加它而不是添加 Http 请求的 BIG 代码
  • @ReutSharabani 如何使用这个Observable.take()
  • 您至少可以将此作为注释添加到您的示例代码中吗?

标签: java android rx-java rx-android


【解决方案1】:

1) 你在这里处理的是一个cold observable,这意味着每次订阅都会调用OnSubscribe.call()。您可以在此处使用 publish()Observable 转换为 ConnectableObservable 或任何其他将 cold 转换为 的运算符热

2) 如果您希望您的 Observable 在从后台返回后继续工作,则可能 onStart/onStop 不是正确的生命周期回调。我会在那里进行 onCreate/onDestroy

3) 取消订阅不会停止OnSubscribe.call() 的执行。最简单的解决方案是检查循环内的 isUn​​subscribed() 并相应地停止。

【讨论】:

    【解决方案2】:

    使用Subject

    Observable<Long> observable = Observable.interval(100, TimeUnit.MILLISECONDS);
    
    PublishSubject<Object> subject = PublishSubject.create();
    observable.subscribe(subject);
    
    Subscription subscription = subject.subscribe(o -> {
        System.out.println("o = " + o);
    });
    
    Thread.sleep(500);
    subscription.unsubscribe();
    
    Subscription subscription2 = subject.subscribe(o -> {
        System.out.println("continued = " + o);
    });
    
    Thread.sleep(500);
    subscription.unsubscribe();
    

    这将产生以下输出:

    o = 0
    o = 1
    o = 2
    o = 3
    o = 4
    continued = 5
    continued = 6
    continued = 7
    continued = 8
    continued = 9
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-10-17
      • 2020-04-19
      • 1970-01-01
      • 1970-01-01
      • 2020-11-24
      • 2018-10-13
      • 1970-01-01
      相关资源
      最近更新 更多