【问题标题】:How do I wrap a java.util.concurrent.Future in an Akka Future?如何将 java.util.concurrent.Future 包装在 Akka Future 中?
【发布时间】:2023-12-02 12:47:01
【问题描述】:

在 Play Framework 2.0.1 (Scala) 应用程序中,我们使用了一个 Web 服务客户端库,它返回 java.util.concurrent.Future 作为响应。

我们不想在 get() 调用中阻止 Play 应用,而是将 j.u.c.Future 包装在 akka.dispatch.Future 中,以便我们可以轻松使用 play 框架的 AsyncResult 处理。

以前有没有人这样做过,或者有库或示例代码?


更新:我们发现最接近的是这个 google 群组讨论:https://groups.google.com/forum/#!topic/play-framework/c4DOOtGF50c

...如果您所拥有的只是一个普通的 j.u.c.Future,那么创建非阻塞解决方案的最佳方法是采用 j.u.c.Future 和一个 Promise,并将它们提供给运行轮询循环的某个线程,该线程将完成用 Future 完成后的结果承诺。

有没有人有这方面的示例实现?

【问题讨论】:

    标签: scala playframework-2.0 akka future


    【解决方案1】:

    @Viktor Klang:我们知道j.u.c.Future 是可憎的。但这就是我们从一个我们必须暂时接受的软件中得到的回报。

    到目前为止,这是我们一起破解的:

    def wrapJavaFutureInAkkaFuture[T](javaFuture: java.util.concurrent.Future[T], maybeTimeout: Option[Duration] = None)(implicit system: ActorSystem): akka.dispatch.Future[T] = {
      val promise = new akka.dispatch.DefaultPromise[T]
      pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout.map(_.fromNow))
      promise
    }
    

    换句话说,创建一个单独的 Akka PromiseFuture 的写入端)对应于 j.u.c.Future,启动回调 pollJavaFutureUntilDoneOrCancelled 以通过轮询“可憎”来更新 Promise,并将 Promise 返回给调用者。

    那么我们如何“轮询”以根据 j.u.c.Future 的状态更新 Akka Promise?

    def pollJavaFutureUntilDoneOrCancelled[T](javaFuture: java.util.concurrent.Future[T], promise: akka.dispatch.Promise[T], maybeDeadline: Option[Deadline] = None)(implicit system: ActorSystem) {
      if (maybeDeadline.exists(_.isOverdue)) javaFuture.cancel(true);
    
      if (javaFuture.isDone || javaFuture.isCancelled) {
        promise.complete(allCatch either { javaFuture.get })
      } else {
        Play.maybeApplication.foreach { implicit app =>
          system.scheduler.scheduleOnce(50 milliseconds) {
            pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeDeadline)
          }
        }
      }
    }
    

    这是对我在问题中引用的 google 群组讨论中暗示的内容的尝试。它使用 Akka 调度程序每 50 毫秒回调一次,以检查 j.u.c.Future 是完成还是取消。每当发生这种情况时,它都会将 Akka Promise 更新为完成状态。

    @Victor Klang 等人:

    这是最佳做法吗?你知道更好的方法吗?我们是否错过了我们应该知道的缺点?

    感谢您提供更多帮助。

    【讨论】:

    • 一个明显的缺点是,在最坏的情况下,这会导致响应的高度延迟。例如,如果您有默认设置,并且您的未来在检查后 1 毫秒完成,则可能会导致大约 100 毫秒的延迟。然而,这可以通过在配置中设置 scheduler.tick-duration 设置来调整。
    • @drexin 是的,但是在任何基于轮询的解决方案中都会存在滴答持续时间和轮询频率的权衡,对吧?
    • 当然,但是当您询问缺点时,我只想告诉您,它不仅取决于 scheduleOnce 调用的延迟参数,还取决于 akka 配置中的设置。如果您可以忍受延迟,这应该是一个可用的解决方案。
    【解决方案2】:

    您应该使用akka.dispatch.Futures.future()java.util.concurrent.Callable

    val akkaFuture: akka.dispatch.Future[String] = akka.dispatch.Futures.future(
      new java.util.concurrent.Callable[String] {
        def call: String = {
          return "scala->" + javaFuture.get
        }
    }, executionContext)
    

    Gist for complete example

    【讨论】:

    • 这本质上会导致线程数量超出必要的数量,其中一个会阻塞,并且并不比在主线程上简单地调用 javaFuture.get 更好。除非在绝对需要组件兼容性的极端情况下,在这里引入 akka future 并没有什么好处。