【问题标题】:Couchbase async batch error handlingCouchbase 异步批处理错误处理
【发布时间】:2017-04-12 15:32:17
【问题描述】:

我想,我从 Observables+Couchbase 异步 api 得到了一个大脑爆炸 :) 有人可以帮我吗? 已经与批处理操作斗争了几天,仍然无法理解如何在正确处理错误的情况下进行批处理操作。

假设我想批量更新 Couchbase 中的一些文档。 如果我使用同步 API,它看起来像:

List< JsonDocument> items = getItems(1, 2, 3, 4, 5);
// getItems - some method which calls bucket.get() for specified keys

for (JsonDocument item : items) {
   try {
      try {
         item.content().put("age", 42);
         bucket.replace(item);
      } catch (CASMismatchException e) {
        // retry
        bucket.get(item.id()).content().put("age", 42);
        bucket.replace(item);
      }
   } catch (Exception e) {
      // handle error which doesn't stop execution for other items
      // for example, add item id to list of failed items in response
      errorHandler.handleError(item.id(), e);
   }
}

但这不是并行的,文档说异步 API 更有效。 我无法理解的是如何通过 Observables 创建这样的流程,我试过了:

Observable.from(items)
.flatMap(item -> {
   item.content().put("age", 42);
   return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
   // what to do? return another observable which does retry logic above?
   // how do I know what item has failed?
   // I don't have ID of that item, nor I can extract it from passed Exception
   // why onErrorResumeNext is getting called only once (if one item fails)
   // and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)

任何帮助将不胜感激! 谢谢

【问题讨论】:

  • 我想你最好需要通过 Observable.create 构建 Observable 并使用 try catch 然后直接在其中重试,如果重试工作发出此项目,如果没有则发出错误。

标签: java asynchronous rx-java couchbase observable


【解决方案1】:

你可以这样做:

  Observable.from(items)
            .flatMap(item -> {
                item.content().put("age", 42);
                return bucket.async()
                        .replace(item)
                        .retry((count, throwable) -> count == 1 && throwable instanceof CASMismatchException)
                        .onErrorReturn(e -> {
                            errorHandler.handleError(item.id(), e);
                            return null; //or item, if you need the item further down the stream
                        })
                        .subscribeOn(Schedulers.io()); //not sure if it's needed with bucket.async()
            })
            .subscribeOn(<something>) //with this scheduler the put() method will be executed 
            .subscribe();

这个想法是通过flatMap() 将每个项目处理分离到一个单独的 Observable,因为每个重试逻辑都针对单个项目,而不是整个流。 重试运算符使用为您提供重试计数和异常的谓词进行操作,因此在您的情况下,我们第一次重试仅使用特定的CASMismatchException 异常,然后对于错误我们可以简单地执行onErrorReturn 并处理其他错误,您如果您想继续处理,甚至可以退货。
需要注意的一件事是调度程序,我不确定 Couchbase 在进行async() 调用时是否默认在io() 上运行。另外,考虑这一行:

 item.content().put("age", 42);

将在最后一个 subscribeOn() 上执行,因为它将在主流订阅调度程序上完成。

【讨论】:

  • 感谢您的帮助!我会尽量听从你的建议。我描述的案例非常简单,但是通过 item Observable 而不是 list Observable 捕获错误的想法是我真正错过的,现在它很可能会解决我的问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2010-11-12
  • 2018-07-29
  • 2016-03-30
  • 2011-04-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多