【问题标题】:asynchronous processing using list of Scala futures with onComplete for exception handling使用带有 onComplete 的 Scala 期货列表进行异步处理以进行异常处理
【发布时间】:2014-06-15 00:14:08
【问题描述】:

我正在尝试进行大量外部服务调用,每个调用都进行异常处理和有条件的进一步处理。我认为使用内部的 .onComplete 扩展这个很好的 (Asynchronous IO in Scala with futures) 示例会很容易,但似乎我不了解范围和/或期货。谁能指出我正确的方向吗?

#!/bin/bash
scala -feature $0 $@
exit
!#

import scala.concurrent.{future, blocking, Future, Await}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Success, Failure}
import scala.language.postfixOps

val keylist = List("key1", "key2")

val myFuts: List[Future[String]] = keylist.map {
  myid => future {
    // this line simulates an external call which returns a future (retrieval from S3)
    val myfut = future { Thread.sleep(1); "START " + myid}

    var mystr = "This should have been overwritten"
    myfut.onComplete {
      case Failure(ex) => {
        println (s"failed with error: $ex")
        mystr = "FAILED"
      }
      case Success(myval) => {
        mystr = s"SUCCESS $myid: $myval"
        println (mystr)
      }
    }
    mystr
  }
}

val futset: Future[List[String]] = Future.sequence(myFuts)
println (Await.result(futset, 10 seconds))

在我的电脑 (Scala 2.10.4) 上打印:

SUCCESS key2: START key2
SUCCESS key1: START key1
List(This should have been overwritten, This should have been overwritten)

我想要(订单不重要):

SUCCESS key2: START key2
SUCCESS key1: START key1
List(SUCCESS key2: START key2, SUCCESS key1: START key1)

【问题讨论】:

    标签: scala asynchronous concurrency future


    【解决方案1】:

    完成时不会返回新的未来,它只是允许您在未来完成时做某事。因此,您的第一个 Future 块在执行 onComplete 之前返回,因此您将取回字符串的原始值。

    我们可以做的是使用一个promise,返回另一个future,这个future由第一个future的结果完成。

      val keylist = List("key1", "key2")
    
      val myFuts: List[Future[String]] = keylist.map {
        myid => {
          // this line simulates an external call which returns a future (retrieval from S3)
          val myfut = Future {
            Thread.sleep(1); "START " + myid
          }
          var mystr = "This should have been overwritten"
          val p = Promise[String]()
          myfut.onComplete {
            case Failure(ex) =>
              println(s"failed with error: $ex")
              mystr = "FAILED"
              p failure ex
            case Success(myval) =>
              mystr = s"SUCCESS $myid: $myval"
              println(mystr)
              p success myval
          }
          p.future
        }
      }
    
      val futset: Future[List[String]] = Future.sequence(myFuts)
      println(Await.result(futset, 10 seconds))
    

    我在这里询问的 mapAll 方法会非常方便: Map a Future for both Success and Failure

    【讨论】:

    • 当我运行它时它超时并出现错误,但有趣的解决方案。我不明白 'p' Promise 如何以与任务相关的值完成。感谢您的帮助
    • 糟糕,添加了。
    • 现在很好用。很高兴看到使用 Promise 的解决方案!我一直想知道它们除了用于竞争性完成之外还有什么用处。
    【解决方案2】:

    我会避免使用onComplete 并尝试使用它对可变变量执行副作用逻辑。相反,我会映射未来并将失败情况处理为返回不同的值。这是您的代码的略微修改版本,在Future 上使用map(通过理解),然后使用recover 处理失败情况。希望这就是您想要的:

    val keylist = List("key1", "key2")
    
    val myFuts: List[Future[String]] = keylist.map {myid => 
    
      // this line simulates an external call which returns a future (retrieval from S3)
      val myfut = future { Thread.sleep(1); "START " + myid}
      val result = for (myval <- myfut) yield {
        val res = s"SUCCESS $myid: $myval"
        println(res)
        res
      }
      result.recover{
        case ex => 
          println (s"failed with error: $ex")
          "FAILED"          
      }
    
    }
    
    val futset: Future[List[String]] = Future.sequence(myFuts)
    println (Await.result(futset, 10 seconds))
    

    【讨论】:

    • 效果很好。 .recover 方法是该模式的关键。我没有意识到有人可以做到这一点。非常感谢!
    • 对于那些感兴趣的人,stackoverflow.com/a/15776974/29771 给出了这个答案和更有效的替代方案。
    猜你喜欢
    • 2020-06-15
    • 2017-09-30
    • 1970-01-01
    • 2020-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-10-23
    相关资源
    最近更新 更多