【问题标题】:Scala - Execute arbitrary number of Futures sequentially but dependently [duplicate]Scala - 顺序但依赖地执行任意数量的期货[重复]
【发布时间】:2014-10-29 02:46:34
【问题描述】:

我正在尝试找出按顺序执行一系列 Future 的最简洁方法,其中一个 Future 的执行取决于前一个。我正在尝试对任意数量的期货执行此操作。

用户案例:

  • 我从我的数据库中检索到了一些 Id。
  • 我现在需要在 Web 服务上检索一些相关数据。
  • 我想在找到有效结果后停止。
  • 我只关心成功的结果。

并行执行所有这些,然后解析返回的结果集合不是一种选择。我必须一次执行一个请求,并且只有在前一个请求没有返回结果时才执行下一个请求。

目前的解决方案是沿着这些思路。使用 foldLeft 执行请求,然后仅在前一个未来满足某些条件时才评估下一个未来。

def dblFuture(i: Int) = { i * 2 }
val list = List(1,2,3,4,5)
val future = list.foldLeft(Future(0)) {
  (previousFuture, next) => {
    for {
      previousResult <- previousFuture
      nextFuture <- { if (previousResult <= 4) dblFuture(next) else previousFuture }
    } yield (nextFuture)
  }
}

这样做的最大缺点是 a) 即使我得到满意的结果,我也会继续处理所有项目;b) 一旦找到我想要的结果,我就会继续评估谓词。在这种情况下,它是一个简单的 if,但实际上它可能更复杂。

我觉得我错过了一个更优雅的解决方案。

【问题讨论】:

  • 我对用例感到困惑,因为它看起来不像您所描述的那种数据流依赖性(“一个未来的执行取决于前一个”),因为你'如果前一个 Future 的结果为 empty,则仅执行下一个 Future。我错过了什么?也就是说,下一个 Future 是否以任何方式依赖于先前的结果,而不仅仅是决定它是否执行?
  • @ChrisMartin 下一次futures执行不仅取决于上一次Future是否成功,还取决于response。例如:如果上一个future包含状态为404的WSResponse,则执行下一个future,否则不执行。
  • @AlexeyRomanov 感谢您指出另一个问题。这个非常相似,唯一的区别是我使用的是一系列期货,而不是多次执行的单一期货。
  • @healsjnr 在该问题中,该流恰好充满了等效的期货,但答案无论如何都不取决于它:)

标签: scala future sequential for-comprehension foldleft


【解决方案1】:

查看您的示例,似乎先前的结果与后续结果无关,而唯一重要的是先前的结果满足某些条件以防止计算下一个结果。如果是这种情况,这里是使用filterrecoverWith 的递归解决方案。

def untilFirstSuccess[A, B](f: A => Future[B])(condition: B => Boolean)(list: List[A]): Future[B] = {
    list match {
        case head :: tail => f(head).filter(condition).recoverWith { case _: Throwable => untilFirstSuccess(f)(condition)(tail) }
        case Nil => Future.failed(new Exception("All failed.."))
    }
 }

filter 只会在Future 完成时调用,recoverWith 只会在Future 失败时调用。

def dblFuture(i: Int): Future[Int] = Future { 
     println("Executing.. " + i)
     i * 2 
 }

val list = List(1, 2, 3, 4, 5)

scala> untilFirstSuccess(dblFuture)(_ > 6)(list)
Executing.. 1
Executing.. 2
Executing.. 3
Executing.. 4
res1: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@514f4e98

scala> res1.value
res2: Option[scala.util.Try[Int]] = Some(Success(8))

【讨论】:

    【解决方案2】:

    最简洁的方式,“真正的函数式编程”是 scalaz-stream ;) 但是,您需要从 scala Future 切换到 scalaz.concurrent.Task 以获得“未来结果”的抽象。这有点不同。 Task 是纯粹的,Future 是“运行计算”,但它们有很多共同点。

      import scalaz.concurrent.Task
      import scalaz.stream.Process
    
      def dblTask(i: Int) = Task {
        println(s"Executing task $i")
        i * 2
      }
    
      val list = Seq(1,2,3,4,5)
    
      val p: Process[Task, Int] = Process.emitAll(list)
    
      val result: Task[Option[Int]] =
        p.flatMap(i => Process.eval(dblTask(i))).takeWhile(_ < 10).runLast
    
      println(s"result = ${result.run}")
    

    结果:

    Executing task 1
    Executing task 2
    Executing task 3
    Executing task 4
    Executing task 5
    result = Some(8)
    

    如果您的计算已经是 scala Future,您可以将其转换为 Task

    implicit class Transformer[+T](fut: => SFuture[T]) {
      def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
        import scala.util.{Failure, Success}
        import scalaz.syntax.either._
        Task.async {
          register =>
            fut.onComplete {
              case Success(v) => register(v.right)
              case Failure(ex) => register(ex.left)
            }
        }
      }
    }
    

    【讨论】:

    • 感谢 Eugene,我非常喜欢这个解决方案,但我目前没有使用 ScalaZ,希望有一个使用标准 Scala 未来的解决方案(因此我将@LimbSoup 标记为正确答案)。不过会更详细地介绍 SacalZ。
    猜你喜欢
    • 2012-11-29
    • 1970-01-01
    • 2016-08-21
    • 1970-01-01
    • 2016-07-11
    • 1970-01-01
    • 2019-11-04
    • 2014-03-21
    • 2018-06-13
    相关资源
    最近更新 更多