【问题标题】:mapping over scala rx observables when only onComplete is called仅调用 onComplete 时映射 scala rx observables
【发布时间】:2017-11-08 14:48:15
【问题描述】:

我使用 scala observables 从 couchbase 获取项目,然后我使用 map、flatMap、zip 来转换结果。问题是,如果一个项目在 couchbase 中不存在,那么例如 .zip 不会仅调用 onComplete。示例:

import rx.lang.scala._

def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
  val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id))
  values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id.
  ...
}

所以我想要:

  1. 返回 k -> v
  2. 的映射
  3. 我让 .zip 将 k 耦合到返回的 v(如果不存在,我希望 v 类似于 None
  4. 我看到如果 db 中不存在任何项目,则根本不调用 zip。

我想到在运行上面的代码后,扫描ids 输入参数,并为每个没有压缩值的参数添加一个 id 到它的值,但这就像添加另一个流,我希望 zip 处理这两个现有和不存在的行。

我应该如何处理?如何让.zip 处理现有行和非现有行?

【问题讨论】:

    标签: scala rx-java rx-scala couchbase-java-api


    【解决方案1】:

    不要使用zip() 运算符。相反,只需使用flatMap()materialize().take(1)materialize() 会将onComplete() 事件转换为可以映射到NoneNotification,而带有值的Notification 将映射到Some(value)

    def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
      val values = Observable.from(keyValueIds)
       .flatMap(id => couchbaseBucket.async()
         .get(id)
         .materialize()
         .take(1)
         .map( res => if ( res.isOnComplete() )
                         (id, None)
                      else 
                         (id, Some(res.getValue))
      ...
    }
    

    【讨论】:

    • .materialize().take(1).map( res => if ( res.isOnComplete() ) (id, None) else (id, Some(res.getValue)) 可以只替换为 .headOption
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-15
    • 1970-01-01
    • 1970-01-01
    • 2017-04-20
    相关资源
    最近更新 更多