【发布时间】:2016-11-25 03:09:25
【问题描述】:
我刚开始学习适用于 Android 的 rxJava,并希望实现常见的用例:
- 从缓存中请求数据并显示给用户
- 从网络请求数据
- 服务器更新存储中的数据并自动显示给用户
传统上最好的场景是使用 CursorLoader 从缓存中获取数据,在单独的线程中运行 Web 请求并通过 content provider 将数据保存到磁盘,content provider 自动通知监听器和 CursorLoader 自动更新 UI。
在 rxJava 中,我可以通过运行两个不同的观察者来做到这一点,如下面的代码所示,但我没有找到如何将这两个调用组合成一个来达到我的目标的方法。谷歌搜索显示了这个线程,但它看起来只是从缓存中获取数据或从 Web 服务器获取数据,但不要同时执行 RxJava and Cached Data
代码sn-p:
@Override
public Observable<SavingsGoals> getCachedSavingsGoal() {
return observableGoal.getSavingsGoals()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public Observable<SavingsGoals> getRecentSavingsGoal() {
return api.getSavingsGoals()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
model.getCachedSavingsGoal().subscribe(new Observer<SavingsGoals>() {
@Override
public void onCompleted() {
// no op
}
@Override
public void onError(Throwable e) {
Log.e(App.TAG, "Failed to consume cached data");
view.showError();
}
@Override
public void onNext(SavingsGoals savingsGoals) {
Log.d(App.TAG, "Show the next item");
if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
view.showData(savingsGoals.getSavingsGoals());
} else {
view.showError();
}
}
});
model.getRecentSavingsGoal().subscribe(new Observer<SavingsGoals>() {
@Override
public void onCompleted() {
// no op
}
@Override
public void onError(Throwable e) {
Log.e(App.TAG, "Failed to consume data from the web", e);
view.showError();
}
@Override
public void onNext(SavingsGoals savingsGoals) {
if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
view.showData(savingsGoals.getSavingsGoals());
} else {
view.showError();
}
}
});
此外,当前方法的问题之一是缓存和 Web 数据不能保证按顺序运行。过时的数据可能是最新的并覆盖来自网络的最新数据。
为了解决这个问题,我实现了通过时间戳过滤的观察者合并:它从缓存中获取数据,将其传递给下一个观察者,如果缓存过时,则向网络发出新的调用 - 通过时间戳过滤解决线程竞争的情况.但是,这种方法的问题是我无法从这个 Observable 返回缓存数据——我需要等待两个请求完成它们的工作。
代码 sn-p。
@Override
public Observable<Timestamped<SavingsGoals>> getSavingGoals() {
return observableGoal
.getTimestampedSavingsGoals()
.subscribeOn(Schedulers.io())
.flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
@Override
public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> cachedData) {
Log.d(App.FLOW, "getTimestampedSavingsGoals");
return getGoalsFromBothSources()
.filter(filterResponse(cachedData));
}
})
.subscribeOn(AndroidSchedulers.mainThread());
}
private Func1<Timestamped<SavingsGoals>, Boolean> filterResponse(Timestamped<SavingsGoals> cachedData) {
return new Func1<Timestamped<SavingsGoals>, Boolean>() {
@Override
public Boolean call(Timestamped<SavingsGoals> savingsGoals) {
return savingsGoals != null
&& cachedData != null
&& cachedData.getTimestampMillis() < savingsGoals.getTimestampMillis()
&& savingsGoals.getValue().getSavingsGoals().size() != 0;
}
};
}
private Observable<Timestamped<SavingsGoals>> getGoalsFromBothSources() {
Log.d(App.FLOW, "getGoalsFromBothSources:explicit");
return Observable.merge(
observableGoal.getTimestampedSavingsGoals().subscribeOn(Schedulers.io()),
api.getSavingsGoals()
.timestamp()
.flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
@Override
public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> savingsGoals) {
Log.d(App.FLOW, "getGoalsFromBothSources:implicit");
return observableGoal.saveAllWithTimestamp(savingsGoals.getTimestampMillis(), savingsGoals.getValue().getSavingsGoals());
}
}))
.subscribeOn(Schedulers.io());
}
您知道在一个观察者中执行此操作的方法吗?
可能的解决方案:
@Override
public Observable<SavingsGoals> getSavingGoals() {
return api.getSavingsGoals()
.publish(network ->
Observable.mergeDelayError(
observableGoal.getSavingsGoals().takeUntil(network),
network.flatMap(new Func1<SavingsGoals, Observable<SavingsGoals>>() {
@Override
public Observable<SavingsGoals> call(SavingsGoals savingsGoals) {
return observableGoal.saveAll(savingsGoals.getSavingsGoals());
}
})
)
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
- 抱歉,IDE 中的热替换隐藏了这种方法的问题:第一个如果网络不可用并且缓存线程首先完成,错误将终止整个合并(由 mergeDelayError 解决),第二个是如果缓存是为空并从 Web 请求返回的第一个数据将不会返回给订阅者。正如您所看到的,我的方法在保存和传统合并后返回 Observable,正如我在代码中显示的那样正确处理这种情况,但由于某种原因 takeUntil 不能。问题仍然悬而未决。
【问题讨论】:
-
rxJava,顺便说一下,不是 JavaRx
-
谢谢,添加了正确的名称。