【问题标题】:Andorid rxJava: how to get data from cache and and the same time update it in the background?Android rxJava:如何从缓存中获取数据并同时在后台更新它?
【发布时间】:2016-11-25 03:09:25
【问题描述】:

我刚开始学习适用于 Android 的 rxJava,并希望实现常见的用例:

  • 从缓存中请求数据并显示给用户
  • 从网络请求数据
  • 服务器更新存储中的数据并自动显示给用户

传统上最好的场景是使用 CursorLoader 从缓存中获取数据,在单独的线程中运行 Web 请求并通过 content provider 将数据保存到磁盘,content provider 自动通知监听器和 CursorLoader 自动更新 UI。

在 rxJava 中,我可以通过运行两个不同的观察者来做到这一点,如下面的代码所示,但我没有找到如何将这两个调用组合成一个来达到我的目标的方法。谷歌搜索显示了这个线程,但它看起来只是从缓存中获取数据或从 Web 服务器获取数据,但不要同时执行 RxJava and Cached Data

代码sn-p:

@Override
public Observable<SavingsGoals> getCachedSavingsGoal() {
    return observableGoal.getSavingsGoals()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

@Override
public Observable<SavingsGoals> getRecentSavingsGoal() {
    return api.getSavingsGoals()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

    model.getCachedSavingsGoal().subscribe(new Observer<SavingsGoals>() {
        @Override
        public void onCompleted() {
            // no op
        }

        @Override
        public void onError(Throwable e) {
            Log.e(App.TAG, "Failed to consume cached data");
            view.showError();
        }

        @Override
        public void onNext(SavingsGoals savingsGoals) {
            Log.d(App.TAG, "Show the next item");
            if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
                view.showData(savingsGoals.getSavingsGoals());
            } else {
                view.showError();
            }
        }
    });

    model.getRecentSavingsGoal().subscribe(new Observer<SavingsGoals>() {
        @Override
        public void onCompleted() {
            // no op
        }

        @Override
        public void onError(Throwable e) {
            Log.e(App.TAG, "Failed to consume data from the web", e);
            view.showError();
        }

        @Override
        public void onNext(SavingsGoals savingsGoals) {
            if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
                view.showData(savingsGoals.getSavingsGoals());
            } else {
                view.showError();
            }
        }
    });

此外,当前方法的问题之一是缓存和 Web 数据不能保证按顺序运行。过时的数据可能是最新的并覆盖来自网络的最新数据。

为了解决这个问题,我实现了通过时间戳过滤的观察者合并:它从缓存中获取数据,将其传递给下一个观察者,如果缓存过时,则向网络发出新的调用 - 通过时间戳过滤解决线程竞争的情况.但是,这种方法的问题是我无法从这个 Observable 返回缓存数据——我需要等待两个请求完成它们的工作。

代码 sn-p。

    @Override
public Observable<Timestamped<SavingsGoals>> getSavingGoals() {
    return observableGoal
            .getTimestampedSavingsGoals()
            .subscribeOn(Schedulers.io())
            .flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
                @Override
                public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> cachedData) {
                    Log.d(App.FLOW, "getTimestampedSavingsGoals");
                    return getGoalsFromBothSources()
                            .filter(filterResponse(cachedData));
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread());
}

private Func1<Timestamped<SavingsGoals>, Boolean> filterResponse(Timestamped<SavingsGoals> cachedData) {
    return new Func1<Timestamped<SavingsGoals>, Boolean>() {
        @Override
        public Boolean call(Timestamped<SavingsGoals> savingsGoals) {
            return savingsGoals != null
                    && cachedData != null
                    && cachedData.getTimestampMillis() < savingsGoals.getTimestampMillis()
                    && savingsGoals.getValue().getSavingsGoals().size() != 0;
        }
    };
}

private Observable<Timestamped<SavingsGoals>> getGoalsFromBothSources() {
    Log.d(App.FLOW, "getGoalsFromBothSources:explicit");
    return Observable.merge(
            observableGoal.getTimestampedSavingsGoals().subscribeOn(Schedulers.io()),
            api.getSavingsGoals()
                    .timestamp()
                    .flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
                        @Override
                        public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> savingsGoals) {
                            Log.d(App.FLOW, "getGoalsFromBothSources:implicit");
                            return observableGoal.saveAllWithTimestamp(savingsGoals.getTimestampMillis(), savingsGoals.getValue().getSavingsGoals());
                        }
                    }))
                    .subscribeOn(Schedulers.io());
}

您知道在一个观察者中执行此操作的方法吗?

可能的解决方案:

@Override
public Observable<SavingsGoals> getSavingGoals() {
    return api.getSavingsGoals()
            .publish(network ->
                    Observable.mergeDelayError(
                            observableGoal.getSavingsGoals().takeUntil(network),
                            network.flatMap(new Func1<SavingsGoals, Observable<SavingsGoals>>() {
                                @Override
                                public Observable<SavingsGoals> call(SavingsGoals savingsGoals) {
                                    return observableGoal.saveAll(savingsGoals.getSavingsGoals());
                                }
                            })
                    )
            )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
  • 抱歉,IDE 中的热替换隐藏了这种方法的问题:第一个如果网络不可用并且缓存线程首先完成,错误将终止整个合并(由 mergeDelayError 解决),第二个是如果缓存是为空并从 Web 请求返回的第一个数据将不会返回给订阅者。正如您所看到的,我的方法在保存和传统合并后返回 Observable,正如我在代码中显示的那样正确处理这种情况,但由于某种原因 takeUntil 不能。问题仍然悬而未决。

【问题讨论】:

  • rxJava,顺便说一下,不是 JavaRx
  • 谢谢,添加了正确的名称。

标签: android rx-java


【解决方案1】:

对于第一个问题:您可以使用 doOnNext 方法从网络结果中保存结果,它看起来像这样

public Observable<NetworkResponse> getDataFromNetwork(
      final Request request) {
    return networkCall.doOnNext(networkResponse -> saveToStorage(networkResponse);
  }

现在要结合存储和在线的两个结果,最好的方法是结合发布和合并。我推荐观看this talk。 代码看起来像这样

  public Observable<Response> getData(final Request request) {

    return dataService.getDataFromNetwork(request)
        .publish(networkResponse ->  Observable.merge(networkResponse, dataService.getDataFromStorage(request).takeUntil(networkResponse)));
  }

我问你为什么要使用发布和合并? publish 方法使响应在回调中可访问。 takeUntil 表示您将从存储中获取数据,但如果由于某种原因,网络调用在访问存储数据完成之前完成,您将停止它。这样,您可以确保始终显示来自网络的新数据,即使在从存储中获取旧数据之前已经完成。

最后但并非最不重要的一点是,在您的订阅者 OnNext 中,只需将项目添加到列表中即可。 (list.clear 和 list.addAll) 或类似的功能,或者在你的情况下 view.showData()

编辑:如果网络出错时通话中断,请在末尾添加 onErrorResumeNext。

  public Observable<Response> getData(final Request request) {

    return dataService.getDataFromNetwork(request)
        .publish(networkResponse ->  Observable.merge(networkResponse, dataService.getDataFromStorage(request).takeUntil(networkResponse)))
        .onErrorResumeNext(dataService.getDataFromStorage(request);
  }

【讨论】:

  • 文森特,感谢您指出这种方法——这是正确的方法!
  • 对不起,IDE中的热替换隐藏了这种方法的问题:第一个如果网络不可用并且缓存线程首先完成,错误将终止整个合并(由mergeDelayError解决),第二个是如果缓存为空并从 Web 请求返回第一个数据,则不会返回订阅者。正如您所看到的,我的方法在保存和传统合并后返回 Observable,正如我在代码中显示的那样正确处理这种情况,但由于某种原因 takeUntil 不能。问题仍未解决。
  • 为什么不将保存代码移动到网络 observable 的 doOnNext,而不是将其保存在 flatMap 中。 flatMap 用于将一个对象转换为另一个对象。保存功能应该在doOnNext上,因为它应该只有在成功时才保存。
  • 刚刚在您的潜在解决方案代码中发现了错误。 takeUntil 应该在从磁盘获取数据之后。不是在合并两个 observable 之后。
  • 感谢您提醒我注意正确使用 takeUntil!但是,如果网络呼叫有问题(没有互联网连接),仍然是不调用 onNext 的问题。在这种情况下,我收到了对网络命令的 onError 调用,并且无法从缓存中获取结果(mergeDelayError 应该可以解决这个问题,但它没有更新问题中的代码)
【解决方案2】:

我建议只“收听”本地数据,并在 API 响应到来时刷新它。 假设要获取本地数据,您有类似的东西:

@Nonnull
public Observable<SomeData> getSomeDataObservable() {
    return Observable
            .defer(new Func0<Observable<SomeData>>() {
                @Override
                public Observable<SomeData> call() {
                    return Observable.just(getSomeData());
                }
            });
}

因此,您需要添加每次更新本地数据时都会发出的 PublishSubject (refreshSubject):

@Nonnull
public Observable<SomeData> getSomeDataObservableRefreshable() {
    return refreshSubject.startWith((Object)null).switchMap(new Func1() {
        public Observable<T> call(Object o) {
            return getSomeDataObservable();
        }
    }     
}

现在你只需要订阅getSomeDataObservableRefreshable(),每次数据来自API时,你更新它并制作refreshSubject .onNext(new Object())

我还建议查看rx-java-extensions lib,它有很多 RxAndroid 的“酷工具”。例如,您的问题的解决方案是:

@Nonnull
public Observable<SomeData> getSomeDataObservable() {
    return Observable
            .defer(new Func0<Observable<SomeData>>() {
                @Override
                public Observable<SomeData> call() {
                    return Observable.just(getSomeData());
                }
            })
            .compose(MoreOperators.<SomeData>refresh(refreshSubject));
}

【讨论】:

  • 谢谢,以双向通道为主题的方法很有趣。但是有一些缺陷:第一个,客户端仍然需要订阅 API 调用,否则框架永远不会触发逻辑,直到有人订阅了这个 observable;另一个是客户端需要将一些虚假数据传递给主题以触发它的逻辑。
  • 是真的,所以你可以订阅API调用,如果出现错误,显示错误信息。
猜你喜欢
  • 2021-09-08
  • 1970-01-01
  • 2018-10-24
  • 2018-04-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-12-31
  • 1970-01-01
相关资源
最近更新 更多