【问题标题】:Observable timeout logic: migration to RxJava2可观察到的超时逻辑:迁移到 RxJava2
【发布时间】:2016-12-19 00:43:50
【问题描述】:

我正在慢慢地将我的应用程序从 RxJava 1 迁移到 RxJava 2。 更新所有代码后一切正常,除了一个用例,我现在有点迷茫,我想我需要回到文档才能正确获取它。

应用程序从网络加载Asset 的集合,并且此操作花费的时间超过 x 毫秒,它会显示加载动画。然后在检索数据时,停止/移除动画并显示数据。

这就是我使用 RxJava 1 所拥有的并且正在工作的:

getAssetsSubscription = new GetAssetsUseCase().execute()
                    .publish(new Func1<Observable<List<Asset>>, Observable<List<Asset>>>() {
                        @Override
                        public Observable<List<Asset>> call(Observable<List<Asset>> o) {
                            return o.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
                                    Observable.fromCallable(new Callable<List<Asset>>() {
                                         @Override
                                         public List<Asset> call() throws Exception {
                                             if (isAdded()) {
                                                 getActivity().runOnUiThread(new Runnable() {
                                                     @Override
                                                     public void run() {
                                                         setLoadingViewVisibility(true);
                                                     }
                                                 });
                                             }
                                             return null;
                                         }
                                     }
                                )
                            ).ignoreElements().mergeWith(o);
                        }
                    })
                    .subscribe(new Subscriber<List<Asset>>() {
                @Override
                public void onCompleted() {
                    // Do things...
                }

                @Override
                public void onError(Throwable e) {
                    // Do things...
                }

                @Override
                public void onNext(List<Asset> assets) {
                    // Do things...
                }
            });

这是我对 RxJava 2 的“翻译”:有了这个,数据永远不会显示,onComplete 总是被调用,但onNext 永远不会。未触发超时时也是如此。

disposables.add(new GetAssetsUseCase().execute().publish(new Function<Observable<List<Asset>>,
                    ObservableSource<List<Asset>>>() {
                @Override
                public ObservableSource<List<Asset>> apply(Observable<List<Asset>> listObservable) throws
                        Exception {
                    return listObservable.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
                            Observable.fromCallable(new Callable<List<Asset>>() {
                                @Override
                                public List<Asset> call() throws Exception {
                                    if (isAdded()) {
                                        getActivity().runOnUiThread(new Runnable() {
                                            @Override
                                            public void run() {
                                                setLoadingViewVisibility(true);
                                            }
                                        });
                                    }
                                    return null;
                                }
                            })
                    ).ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable();
                }
            })
            .subscribeWith(new DisposableObserver<List<Asset>>() {
                @Override
                public void onComplete() {
                    // Do things...
                }

                @Override
                public void onError(Throwable e) {
                    // Do things...
                }

                @Override
                public void onNext(List<Asset> assets) {
                    // Do things...
                }
            }));

【问题讨论】:

  • 您是否考虑过将getActivity().runOnUiThread() 替换为observeOn(AndroidSchedulers.mainThread(),就像在任何不在Rx 之外进行随机线程的合理RxJava 示例中一样?
  • .ignoreElements().mergeWith(Completable).toObservable() - 这个 observable 只会调用 onErroronComplete。元素被忽略,所以onNext 永远不会发生。
  • @Kiskae 是的,这就是我所看到的,很奇怪,因为在这些更改之前它正在工作(或不工作?)。

标签: android rx-java rx-java2


【解决方案1】:

原代码使用Observable#mergeWith(Observable)。 由于 RxJava2 在适当的地方缩小了类型,因此在您的修改代码中更改为 Completable.mergeWith(Completable)

要获得与旧代码相同的行为,您需要更改操作顺序:

  • 来自.ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable()
  • .ignoreElements().&lt;List&lt;Asset&gt;&gt;toObservable().mergeWith(listObservable)

因为Completable.fromObservable(...)基本上等同于Observable#ignoreElements()

此外,return null; 可能会导致 RxJava2 出现问题,因为合同规定事件流中不能有 null 值。考虑将Observable.fromCallable(...) 替换为Completable.fromRunnable(...).toObservable()

【讨论】:

  • 谢谢!这就是我在检查文档后得出的结论,即使它仍然不是 100% 清楚,但我会再次浏览 RxJava2 文档。一个小细节:正确的更改是ignoreElements().&lt;List&lt;Asset&gt;&gt;toObservable().mergeWith(listObservable)
【解决方案2】:

那是因为你的 callable 返回null,这意味着它是一个终端事件。

@Override
public List<Asset> call() throws Exception {
    if(isAdded()) {
        getActivity().runOnUiThread(new Runnable() { 
            @Override
            public void run() {
                setLoadingViewVisibility(true);
            }
        });
    }
    return null;
}

应该是

@Override
public List<Asset> call() throws Exception {
    if(isAdded()) {
        getActivity().runOnUiThread(new Runnable() { // TODO: move after `observeOn(AndroidSchedulers.mainThread())`
            @Override
            public void run() {
                setLoadingViewVisibility(true);
            }
        });
    }
    return Collections.emptyList();
}

【讨论】:

  • 谢谢,但仍然没有接到 onNext 的电话。未触发超时时不调用。
猜你喜欢
  • 2017-03-20
  • 1970-01-01
  • 1970-01-01
  • 2018-05-20
  • 2019-11-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-18
相关资源
最近更新 更多