【发布时间】:2017-04-12 15:32:17
【问题描述】:
我想,我从 Observables+Couchbase 异步 api 得到了一个大脑爆炸 :) 有人可以帮我吗? 已经与批处理操作斗争了几天,仍然无法理解如何在正确处理错误的情况下进行批处理操作。
假设我想批量更新 Couchbase 中的一些文档。 如果我使用同步 API,它看起来像:
List< JsonDocument> items = getItems(1, 2, 3, 4, 5);
// getItems - some method which calls bucket.get() for specified keys
for (JsonDocument item : items) {
try {
try {
item.content().put("age", 42);
bucket.replace(item);
} catch (CASMismatchException e) {
// retry
bucket.get(item.id()).content().put("age", 42);
bucket.replace(item);
}
} catch (Exception e) {
// handle error which doesn't stop execution for other items
// for example, add item id to list of failed items in response
errorHandler.handleError(item.id(), e);
}
}
但这不是并行的,文档说异步 API 更有效。 我无法理解的是如何通过 Observables 创建这样的流程,我试过了:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
// what to do? return another observable which does retry logic above?
// how do I know what item has failed?
// I don't have ID of that item, nor I can extract it from passed Exception
// why onErrorResumeNext is getting called only once (if one item fails)
// and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)
任何帮助将不胜感激! 谢谢
【问题讨论】:
-
我想你最好需要通过 Observable.create 构建 Observable 并使用 try catch 然后直接在其中重试,如果重试工作发出此项目,如果没有则发出错误。
标签: java asynchronous rx-java couchbase observable