【问题标题】:How to cancel Future in Scala?如何在Scala中取消Future?
【发布时间】:2013-04-07 06:02:57
【问题描述】:

Java Futurecancel 方法,可以中断运行Future 任务的线程。例如,如果我将 interruptible 阻塞调用包装在 Java Future 中,我可以稍后将其中断。

Scala Future 不提供 cancel 方法。假设我在Scala Future 中封装了一个可中断 阻塞调用。我怎么能打断它?

【问题讨论】:

    标签: multithreading scala future


    【解决方案1】:

    这还不是Futures API 的一部分,但将来可能会作为扩展添加。

    作为一种解决方法,您可以使用 firstCompletedOf 来包装 2 个未来 - 您要取消的未来和来自自定义 Promise 的未来。然后,您可以通过使承诺失败来取消由此创建的未来:

    def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = {
      val p = Promise[T]
      val first = Future firstCompletedOf Seq(p.future, f)
      val cancellation: () => Unit = {
        () =>
          first onFailure { case e => customCode}
          p failure new Exception
      }
      (cancellation, first)
    }
    

    现在您可以在任何未来调用它来获得“可取消的包装器”。示例用例:

    val f = callReturningAFuture()
    val (cancel, f1) = cancellable(f) {
      cancelTheCallReturningAFuture()
    }
    
    // somewhere else in code
    if (condition) cancel() else println(Await.result(f1))
    

    编辑:

    有关取消的详细讨论,请参阅Learning concurrent programming in Scala 书中的第 4 章。

    【讨论】:

    • 是的,Promise 是 scala 中期货的生产端,因此您可以控制结果。部分链接:scala-lang.org/api/current/index.html#scala.concurrent.Promisedocs.scala-lang.org/overviews/core/futures.html#promises
    • 谢谢。为什么标准 Scala 库不包含这个 cancel
    • 不幸的是,这不是 100% 可靠的。实际上,在您调用 cancelcustomCode 实际停止未来主体的时间之间(例如,customCode 设置了一个布尔标志,由未来主体检查以了解是否中止),任何事情都可能发生。特别是未来的身体可能会开始执行。最终结果:cancellable 返回的 future 表示它已被取消,但 future 的主体实际上已执行。一旦未来的身体产生任何副作用,这就是一个真正的问题。这使得使用你的代码实际上很危险。
    • 由于所讨论的原因,提供Future.cancel 不是一个好主意;另一个问题是 Future 是一个共享的只读句柄,因此它不应该提供干扰其他阅读器的方法。您可以做的是将 Future 传递到您要运行的代码中并让该代码定期检查 Future,然后您有一个原则性的方法来通过完成相应的 Promise 来中断计算。
    • Future.cancel 真的很有帮助。我最近在创建连接到不同数据库以收集统计数据的期货时遇到了一个问题。一些 dbs 有挂起连接的问题,这使得未来永远运行。由于 stat 收集定期运行,所以猜猜看,我的应用程序最终耗尽了所有可用的 cpu。我意识到了这个问题,并试图在数据库修复之前找到一种让未来超时的方法,但到目前为止还没有找到好的答案:(
    【解决方案2】:

    我没有对此进行测试,但这扩展了 Pablo Francisco Pérez Hidalgo 的答案。我们没有阻塞等待java Future,而是使用中间Promise

    import java.util.concurrent.{Callable, FutureTask}
    import scala.concurrent.{ExecutionContext, Promise}
    import scala.util.Try
    
    class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
      private val promise = Promise[T]()
    
      def future = promise.future
    
      private val jf: FutureTask[T] = new FutureTask[T](
        new Callable[T] {
          override def call(): T = todo
        }
      ) {
        override def done() = promise.complete(Try(get()))
      }
    
      def cancel(): Unit = jf.cancel(true)
    
      executionContext.execute(jf)
    }
    
    object Cancellable {
      def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
        new Cancellable[T](executionContext, todo)
    }
    

    【讨论】:

    • 很好的优化!更多 Java 风格的代码,但减少线程使用是值得的。我喜欢 Stack Overflow 的协同作用
    • @PabloFranciscoPérezHidalgo 您介意我将其改编为图书馆吗?你可以找到我的改编here
    • @NthPortal 认为它是开源的。所以,放心吧!如果这个想法的作者被收集在 repo 中会很好:D(你可以包括我的 twitter 句柄“pfcoperez”或堆栈溢出配置文件链接 - 以及夜莺的 - )
    • @NthPortal 顺便说一句,初始版本已经在公共仓库 github.com/Stratio/common-utils/blob/master/src/main/scala/com/… 中,您也可以在那里提交更新的版本;-)
    【解决方案3】:

    通过取消我猜你想粗暴地打断future

    找到这段代码:https://gist.github.com/viktorklang/5409467

    做了一些测试,似乎工作正常!

    享受:)

    【讨论】:

    • 你的代码段有使用这个函数吗?我在理解如何使用它时遇到了一些麻烦。
    • 只需调用该函数。它返回两个值,future 和 cancellor(您可以调用一个函数来取消正在运行的 future)。真的没有什么了。您无需了解它即可使用它。复制粘贴就行了。希望这会有所帮助
    • 这里的答案可能有助于理解如何使用它:stackoverflow.com/a/16050595/237399
    • Future 在你将它们与 flatMap、map、filter 等结合使用时非常有用。当你结合 Futures 时,上面的取消方法不起作用。例如 val f = Future { // 阻塞可中断计算 }.map(res => { // 再阻塞一次可中断计算 }) 如何取消未来 f?
    • 总的来说,我同意可以取消,但不推荐
    【解决方案4】:

    我认为使用Java 7 Future interface 及其实现可以降低实现的复杂性。

    Cancellable 可以构建一个 Java 未来,该未来将被其cancel 方法取消。另一个 future 可以等待它的完成,从而成为 observable 接口,它本身的状态是不可变的:

     class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
    
       private val jf: FutureTask[T] = new FutureTask[T](
         new Callable[T] {
           override def call(): T = todo
         }
       )
    
       executionContext.execute(jf)
    
       implicit val _: ExecutionContext = executionContext
    
       val future: Future[T] = Future {
         jf.get
       }
    
       def cancel(): Unit = jf.cancel(true)
    
     }
    
     object Cancellable {
       def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
         new Cancellable[T](executionContext, todo)
     }
    

    【讨论】:

    • 我还没有测试过这个,但它看起来很棒。但也许 jf.get 调用应该被包装到 scala.concurrent.blocking 块中以更好地衡量。
    • @nightingale 谢谢!我在生产中使用它github.com/Stratio/Common-utils/blob/master/src/main/scala/com/…github.com/Stratio/Crossdata/blob/…jf.get 在传递给 Future 构造函数的块内被调用,恕我直言,确实没有潜在阻塞的危险,是吗?
    • @nightingale 我已经检查了这个问题stackoverflow.com/questions/19681389/…,它(blocking)似乎可能适用。我去看看,谢谢你的建议:)
    • 是的,正如您找到的链接所说,我认为使用blocking 提示正是这种情况,以便底层线程池暂时增加工作人员的数量。不客气!
    • 查看我的答案以实现这个想法。我故意保留它,以便它仍然是您班级的替代品,但现在它应该使用一半的线程。希望它有效并且有用!
    猜你喜欢
    • 1970-01-01
    • 2021-01-18
    • 1970-01-01
    • 2014-05-29
    • 2011-09-07
    • 1970-01-01
    • 2015-10-10
    • 2014-07-24
    • 1970-01-01
    相关资源
    最近更新 更多