【问题标题】:Composition of Conditional Futures in ScalaScala 中条件期货的组成
【发布时间】:2016-11-03 06:21:16
【问题描述】:

我有一些适用于简单案例(2 个 Futures)的代码,但我无法找到将其推广到无限数量的 Futures 的方法。

我想做的是创建一些调用未来的代码,当这个未来完成时,调用另一个,当这个完成时调用另一个,依此类推。

我需要在调用下一个调用之前完成每个调用的结果,因为我可能不需要再次调用它(这是我的停止条件)。

我知道这可以通过递归显式解决,但如果可能的话,我希望使用 for 理解和/或折叠的解决方案。感觉应该有这样的解决方案,但是写的不对。

这是一个生成两个随机整数列表的函数

def nextValue: Future[List[Int]] = Future{
  Thread.sleep(1000)
  val num1 = Random.nextInt(10)
  val num2 = Random.nextInt(10)
  List(num1,num2)
}

现在我想组成无限多个这样的未来并在最后加入它们(列表的一个未来)

我只是为了测试目的调用 await.result

这适用于 2 个级别,但如何将其推广到 N 次调用?

Await.result({
  nextValue.flatMap{ value1 =>
    nextValue.map{ value2 =>
      value1 ++ value2
    }
  }
},1.minute)

【问题讨论】:

  • 如果每个Future 都在前一个完成后才启动,那么使用Future 有什么意义呢?您能否将 foldStream 包装在单个 Future 中,当满足停止条件时将完成?

标签: scala asynchronous monads future


【解决方案1】:
Future.sequence((0 to 100).map(_ => nextValue)).map(_.flatten)

用法:

scala> Future.sequence((0 to 100).map(_ => nextValue)).map(_.flatten)
res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = scala.concurrent.impl.Promise$DefaultPromise@692e028d

scala> Await.result(res3, duration.Duration.Inf)
res4: scala.collection.immutable.IndexedSeq[Int] = Vector(5, 4, 3, 0, 4, 6, 0, 8, 0, 0, 4, 6, 2, 7, 4, 9, 8, 8, 6, 9, 1, 4, 5, 5, 8, 2, 2, 7, 6, 0, 5, 6, 6, 5, 9, 6, 3, 5, 7, 1, 3, 2, 5, 3, 3, 1, 8, 4, 6, 7, 5, 1, 3, 5, 7, 4, 1, 5, 9, 4, 5, 0, 1, 8, 5, 0, 0, 7, 4, 2, 4, 2, 2, 0, 4, 1, 6, 3, 8, 2, 1, 3, 5, 5, 8, 3, 6, 1, 3, 2, 9, 4, 9, 4, 7, 5, 7, 8, 7, 9, 5, 2, 5, 0, 2, 5, 6, 8, 6, 2, 3, 2, 0, 8, 9, 3, 9, 2, 7, 5, 1, 7, 1, 1, 8, 6, 8, 0, 5, 5, 6, 0, 8, 8, 3, 6, 4, 2, 7, 1, 0, 3, 3, 3, 3, 2, 8, 7, 3, 3, 5, 1, 6, 3, 3, 7, 8, 9, 9, 9, 1, 9, 9, 8, 1, 1, 5, 8, 1, 1, 7, 6, 3, 2, 5, 0, 4, 3, 0, 9, 9, 1, 2, 0, 3, 6, 2, 6, 8, 6, 6, 3, 9, 7, 1, 3, 5, 9, 6, 5, 6, 2)

或者使用 scalaz/cats:

//import scalaz._,Scalaz._
// --or--
//import cats.syntax.traverse._
//import cats.std.list._
//import cats.std.future._

(0 to 100).toList.traverseM(_ => nextValue)

来自here的解释:

traverseM(f) 等价于 traverse(f).map(_.join),其中 join 是 扁平化的scalaz名称。它作为一种“提升”很有用 平面地图”:


如果你想要一些条件并且仍然需要保持异步,你可以使用 fs2:

import fs2._
import fs2.util._

def nextValue: Task[List[Int]] = Task.delay{
  import scala.util.Random
  val num1 = Random.nextInt(10)
  val num2 = Random.nextInt(10)
  if(num1 > 5) List(num1,num2) else List()
}
Stream.repeatEval(nextValue).takeWhile(_.size > 0).runLog.map(_.flatten).unsafeRun

https://github.com/functional-streams-for-scala/fs2/blob/series/0.9/docs/guide.md

Iteratees 也可以达到同样的效果:

猫:https://github.com/travisbrown/iteratee 或 scalaz-iteratee 包

一般来说,你不能用 fold 来实现它,因为它实际上是 unfold 并且在 scala 中没有很好的支持展开,因为标准库的 Stream 不能泛化到 Monad/ApplicativeFunctor(比如 @ 987654331@ 确实) - 您只能通过在每个展开步骤上执行 Await.result 来检查条件。

【讨论】:

  • 我认为 OP 正在寻找由退出条件确定的长度集合,即获取 nextValue 直到我们得到终端结果。
  • 你说得对,我没有仔细阅读问题 - 将更新我的答案
  • 是的...我想测试 getNext 的结果,看看它是否有效,如果不是则终止。这可能吗?
  • 我已经添加了外部库的解决方案,现在将添加经典解决方案
  • 不幸的是,如果没有 Await.result 在每个步骤中使用标准库中的展开,我想不出任何实现。我已经添加了解释原因。
猜你喜欢
  • 2015-05-17
  • 2016-05-02
  • 2017-03-26
  • 2020-03-31
  • 2015-08-08
  • 2013-10-18
  • 1970-01-01
  • 2019-03-23
  • 1970-01-01
相关资源
最近更新 更多