【问题标题】:Cancellation with Future and Promise in ScalaScala 中的 Future 和 Promise 取消
【发布时间】:2018-05-26 09:19:10
【问题描述】:

这是my previous question 的后续行动。

假设我有一个任务,它执行一个 interruptible 阻塞调用。我想将它作为Future 运行并取消它使用Promisefailure 方法。

我希望 cancel 按如下方式工作:

  • 如果有人取消任务完成之前我希望任务“立即”完成,如果阻塞调用已经开始,我会中断它希望Future 调用onFailure

  • 如果有人取消任务任务完成后我想得到一个状态,说取消失败,因为任务已经完成。

这有意义吗?是否可以在 Scala 中实现?有没有这种实现的例子?

【问题讨论】:

    标签: scala promise akka future cancellation


    【解决方案1】:

    scala.concurrent.Future 是只读的,因此一个读者不能为其他读者搞砸事情。

    看起来你应该能够实现你想要的如下:

    def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
      val p = Promise[T]()
      val f = p.future
      p tryCompleteWith Future(fun(f))
      (f, () => p.tryFailure(new CancellationException))
    }
    
    val (f, cancel) = cancellableFuture( future => {
      while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag
    
      result  // when we're done, return some result
    })
    
    val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally)
    

    【讨论】:

    • 谢谢。假设我执行一些 interruptible 阻塞调用而不是计算。如何修改上面的代码来中断线程?
    • 您必须添加一个同步变量,在计算开始时将当前线程设置为锁定状态,然后在结束时获取锁定并清除变量。并且取消将获取锁定并在设置的线程上调用中断(如果有),或者如果为空则退出。
    • 应该是while(!future.isCompleted && moreWork) continueCalculation
    • sourcedelica:AtomicBoolean 是可变的,因此这可能不是最佳选择,Future 的原因是它已经分配并且不会以任何方式干扰。
    • @FranciscoLópez-Sancho 在代码中,返回的 Future 将包含计算结果,或者 CancellationException。
    【解决方案2】:

    这是 Victor 的 cmets 代码的可中断版本(Victor,如果我误解了,请纠正我)。

    object CancellableFuture extends App {
    
      def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
        val p = Promise[T]()
        val f = p.future
        val aref = new AtomicReference[Thread](null)
        p tryCompleteWith Future {
          val thread = Thread.currentThread
          aref.synchronized { aref.set(thread) }
          try fun() finally {
            val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
            //Deal with interrupted flag of this thread in desired
          }
        }
    
        (f, () => {
          aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
          p.tryFailure(new CancellationException)
        })
      }
    
      val (f, cancel) = interruptableFuture[Int] { () =>
        val latch = new CountDownLatch(1)
    
        latch.await(5, TimeUnit.SECONDS)    // Blocks for 5 sec, is interruptable
        println("latch timed out")
    
        42  // Completed
      }
    
      f.onFailure { case ex => println(ex.getClass) }
      f.onSuccess { case i => println(i) }
    
      Thread.sleep(6000)   // Set to less than 5000 to cancel
    
      val wasCancelled = cancel()
    
      println("wasCancelled: " + wasCancelled)
    }
    

    Thread.sleep(6000) 的输出是:

    latch timed out
    42
    wasCancelled: false
    

    Thread.sleep(1000) 的输出是:

    wasCancelled: true
    class java.util.concurrent.CancellationException
    

    【讨论】:

    • 谢谢。那绝对更干净。
    • 根据 Victor 的要点更新。
    • 为什么@ViktorKlang 的答案没有更新以反映他自己的cmets 和要点?!当只有一个作者时,两个答案似乎相互竞争:(
    • 它们是两个不同的变体,它们不相互竞争。 Viktor's 使用cancel() 方法取消操作,我使用Thread.interrupt
    【解决方案3】:

    Twitter 的期货实施取消。看看这里:

    https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

    第 563 行显示了负责此操作的抽象方法。 Scala 的 futures 目前不支持取消。

    【讨论】:

    【解决方案4】:

    您可以使用 Monix 库代替 Future

    https://monix.io

    【讨论】:

      猜你喜欢
      • 2013-02-14
      • 2013-04-07
      • 2017-11-15
      • 2017-01-19
      • 1970-01-01
      • 2014-11-10
      • 2021-10-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多