【问题标题】:Cannot recover from RejectedExecutionException for some reason由于某种原因无法从 RejectedExecutionException 中恢复
【发布时间】:2023-03-15 15:45:01
【问题描述】:

所以,我正在尝试编写一个一直运行直到你告诉它停止的任务:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import java.util.concurrent.RejectedExecutionException

def runUntilShutdown(f: => Unit) = {
  val ctx = ExecutionContext.fromExecutorService(null)
  import ExecutionContext.global
  def runTask(): Future[Unit] = Future(f)(ctx)
    .flatMap(_ => runTask())(ctx)
  runTask()
    .recover { case _: RejectedExecutionException => () }(global)
    .onComplete { _ => println("Done") }(global)
  ctx
}

val ctx = runUntilShutdown(Thread.sleep(1000))
ctx.shutdown

我希望这只是在最后打印“完成”,但这永远不会发生。

相反,RejectedExecutionException 的堆栈跟踪被转储到 stderr:

java.util.concurrent.RejectedExecutionException
at scala.concurrent.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1870)
at scala.concurrent.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
at scala.concurrent.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2973)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
at scala.concurrent.forkjoin.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1361)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

想法?

【问题讨论】:

    标签: scala concurrency future


    【解决方案1】:

    此问题已在 Scala 2.13* 中通过 Future & Promise 的新实现得到修复, 您可以在 Scala 2.13.0-M5 上尝试您的示例,但您必须在您的 EC 上调用 shutdownNow 否则它将继续运行,因为它不会接受新任务,但它已经在运行您的 Future .

    示例输出:

    Welcome to Scala 2.13.0-20181205-121558-76b34c4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144).
    Type in expressions for evaluation. Or try :help.
    
    scala> import scala.concurrent.Future
    import scala.concurrent.Future
    
    scala> import scala.concurrent.ExecutionContext
    import scala.concurrent.ExecutionContext
    
    scala> import java.util.concurrent.RejectedExecutionException
    import java.util.concurrent.RejectedExecutionException
    
    scala> def runUntilShutdown(f: => Unit) = {
         |   val ctx = ExecutionContext.fromExecutorService(null)
         |   import ExecutionContext.global
         |   def runTask(): Future[Unit] = Future(f)(ctx)
         |     .flatMap(_ => runTask())(ctx)
         |   runTask()
         |     .recover { case _: RejectedExecutionException => () }(global)
         |     .onComplete { _ => println("Done") }(global)
         |   ctx
         | }
    runUntilShutdown: (f: => Unit)scala.concurrent.ExecutionContextExecutorService
    
    scala> val ctx = runUntilShutdown(Thread.sleep(1000))
    ctx: scala.concurrent.ExecutionContextExecutorService = scala.concurrent.impl.ExecutionContextImpl$$anon$3@23d060c2[Running, parallelism = 8, size = 1, active = 1, running = 0, steals = 0, tasks = 0, submissions = 0]
    
    scala> ctx.shutdownNow
    res2: java.util.List[Runnable] = []
    
    scala> Done
    

    *: https://github.com/scala/bug/issues/9071(无法通过 Future & Promise 的旧实现来实现正确的行为,因此目前没有计划向后移植到 2.12。)

    【讨论】:

    • 啊,很好...我以为我疯了 :) 回复。 showdownNow:flatMap一个新任务,不是吗? (否则,它是否需要上下文?)。我想让当前的任务完成,并让flatMap 无法开始下一个任务......但是我要过一段时间才能获得 2.13 的任何东西,所以,在这一点上,这一切都相当学术。回到我的丑陋var stopped = false :(
    • 从新创建的 Runnable 提交到 execute 方法的意义上来说,这不一定是一项新任务——这就是为什么您需要执行 shutdownNow 以便立即关闭所有内容的原因。
    【解决方案2】:

    看起来你有一个永远不会结束的递归def runTask(): Future[Unit] = Future(f)(ctx).flatMap(_ => runTask())(ctx)。因此,不会打印Done 的输出,因为未来永远不会完成。

    本例正确处理异常

      import scala.concurrent.Future
      import scala.concurrent.ExecutionContext
      import java.util.concurrent.RejectedExecutionException
      import java.util.concurrent.Executors
    
      def runUntilShutdown(f: () => Unit) = {
        implicit val ctx = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
        Future { f() }
          .recover { case _: RejectedExecutionException => () }
          .onComplete { _ =>
            println("Done")
          }
        ctx
      }
    
      val ctx = runUntilShutdown { () =>
        Thread.sleep(10000)
      }
      ctx.shutdown()
    

    【讨论】:

    • 它的哪一部分应该做循环?
    • @Andrey Tyukin 问题是关于恢复而不是循环。如果任务由于递归而永远无法完成,则无法恢复。它一次又一次地运行,却没有到达.recover 中的代码。因此,当上下文关闭时会打印异常。
    • @IvanStanislavciuc 从技术上讲,它并不是真正的递归(不涉及堆栈)。它只是不断地将作业重新提交给执行者。你错了,它“永远不会完成”——当执行程序关闭时,链将终止,这就是我期望它到recover,并打印出消息(最后一部分不起作用——因此我的问题)。是的,这个问题“关于循环”。您的实现做了一些不同的事情。顺便说一句,它最后也不会打印出“完成”(但是,也没有堆栈跟踪转储到 stderr - 谜团变浓了:))。
    • 啊,我知道,您只是在f 之后错过了括号(我编辑了您的答案以解决该问题)-这就是'为什么没有例外-它只是马上完成。现在,行为与我的情况相同-stderr 上的堆栈回溯,最后没有“完成”消息。不过,这与我所说的情况仍然不同:请注意我的问题中有两个执行上下文?这很重要。你的失败是因为你试图在 .recover 已经关闭后使用相同的ctx
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-08
    • 2019-06-03
    • 1970-01-01
    • 1970-01-01
    • 2018-12-05
    • 2015-09-30
    相关资源
    最近更新 更多