【问题标题】:Akka HTTP: Blocking in a future blocks the serverAkka HTTP:将来阻塞会阻塞服务器
【发布时间】:2016-04-11 01:24:59
【问题描述】:

我正在尝试使用 Akka HTTP 对我的请求进行基本身份验证。 碰巧我有一个外部资源要通过身份验证,所以我必须对该资源进行休息调用。

这需要一些时间,在处理过程中,我的 API 的其余部分似乎已被阻止,正在等待此调用。 我用一个非常简单的例子重现了这一点:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

如果我发布到日志端点,我的 get 端点也会等待 5 秒,这是日志端点规定的。

这是预期的行为吗?如果是,我如何在不阻塞整个 API 的情况下进行阻塞操作?

【问题讨论】:

    标签: scala akka future akka-http


    【解决方案1】:

    您观察到的是预期的行为——当然它非常糟糕。存在已知的解决方案和最佳实践来防范它,这很好。在这个答案中,我想花一些时间来简短、详细地解释这个问题——阅读愉快!

    简短回答:“不要阻塞路由基础设施!”,始终使用专用调度程序来阻塞操作!

    观察到症状的原因:问题是您使用context.dispatcher 作为阻塞期货执行的调度程序。路由基础设施使用同一个调度程序(简单来说只是“一堆线程”)来实际处理传入的请求——因此,如果您阻塞所有可用线程,最终会导致路由基础设施匮乏。 (有待讨论和基准测试的事情是,如果 Akka HTTP 可以避免这种情况,我会将其添加到我的研究待办事项列表中。

    必须特别小心地处理阻塞,以免影响同一调度程序的其他用户(这就是为什么我们如此简单地将执行分离到不同的用户),如 Akka 文档部分所述:Blocking needs careful management

    我想在这里提请注意的另一件事是,应该尽可能避免阻塞 API - 如果您的长时间运行的操作不是真正的一个操作,而是一系列操作,您可以将它们分离到不同的参与者或排序的未来。无论如何,只是想指出——如果可能的话,避免这种阻塞调用,但如果你必须——那么下面解释了如何正确处理这些调用。

    深入分析及解决方案

    现在我们知道了问题所在,从概念上讲,让我们看看上面代码中到底有什么问题,以及这个问题的正确解决方案是怎样的:

    颜色 = 线程状态:

    • 绿松石 – 睡觉
    • 橙色 - 等待中
    • 绿色 - 可运行

    现在让我们研究 3 段代码以及它们如何影响调度程序和应用程序的性能。为了强制执行此行为,应用程序已置于以下负载下:

    • [a] 继续请求 GET 请求(请参阅上面最初问题中的代码),它不会在那里阻塞
    • [b] 然后在一段时间后触发 2000 个 POST 请求,这将导致 5 秒阻塞,然后返回未来

    1) [bad] 错误代码上的调度程序行为

    // BAD! (due to the blocking in Future):
    implicit val defaultDispatcher = system.dispatcher
    
    val routes: Route = post { 
      complete {
        Future { // uses defaultDispatcher
          Thread.sleep(5000)                    // will block on the default dispatcher,
          System.currentTimeMillis().toString   // starving the routing infra
        }
      }
    }
    

    所以我们将我们的应用程序暴露给 [a] 负载,您可以看到已经有许多 akka.actor.default-dispatcher 线程 - 它们正在处理请求 - 小的绿色 sn-p 和橙色表示其他线程实际上在那里闲置。

    然后我们启动 [b] 加载,这会导致这些线程阻塞 - 您可以看到早期线程“default-dispatcher-2,3,4”在之前空闲后进入阻塞状态。我们还观察到池在增长——新线程在“default-dispatcher-18,19,20,21...”启动,但它们立即进入睡眠状态(!)——我们在这里浪费了宝贵的资源!

    此类启动线程的数量取决于默认调度程序配置,但可能不会超过 50 左右。由于我们刚刚触发了 2k 阻塞操作,我们使整个线程池处于饥饿状态——阻塞操作占主导地位,以至于路由基础设施没有线程可用于处理其他请求——非常糟糕!

    让我们做点什么(顺便说一句,这是 Akka 的最佳实践——始终隔离阻塞行为,如下所示):

    2) [good!]调度程序行为良好的结构化代码/调度程序

    在您的application.conf 中配置此调度程序,专门用于阻止行为:

    my-blocking-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        // in Akka previous to 2.4.2:
        core-pool-size-min = 16
        core-pool-size-max = 16
        max-pool-size-min = 16
        max-pool-size-max = 16
        // or in Akka 2.4.2+
        fixed-pool-size = 16
      }
      throughput = 100
    }
    

    您应该阅读Akka Dispatchers 文档中的更多内容,以了解此处的各种选项。重点是我们选择了一个ThreadPoolExecutor,它对阻塞操作保持可用的线程有一个硬性限制。大小设置取决于您的应用执行的操作以及您的服务器有多少内核。

    接下来我们需要使用它,而不是默认的:

    // GOOD (due to the blocking in Future):
    implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
    
    val routes: Route = post { 
      complete {
        Future { // uses the good "blocking dispatcher" that we configured, 
                 // instead of the default dispatcher – the blocking is isolated.
          Thread.sleep(5000)
          System.currentTimeMillis().toString
        }
      }
    }
    

    我们使用相同的负载对应用施加压力,首先是一些正常请求,然后我们添加阻塞请求。这就是线程池在这种情况下的行为方式:

    所以最初正常请求很容易由默认调度程序处理,您可以在那里看到几条绿线 - 那是实际执行(我并没有真正将服务器置于重负载下,所以它大部分是空闲的)。

    现在,当我们开始发出阻塞操作时,my-blocking-dispatcher-* 会启动,并启动最多配置的线程数。它处理那里的所有睡眠。此外,在这些线程上没有任何事情发生一段时间后,它会关闭它们。如果我们用另一堆阻塞来攻击服务器,池将启动新线程来处理它们的 sleep()-ing,但与此同时——我们不会浪费我们宝贵的线程在“只是呆在那里并且什么都不做”。

    使用此设置时,正常 GET 请求的吞吐量不会受到影响,它们仍然可以在(仍然相当免费的)默认调度程序上愉快地提供服务。

    这是处理反应式应用程序中任何类型阻塞的推荐方法。它通常被称为“屏蔽”(或“隔离”)应用程序的不良行为部分,在这种情况下,不良行为是休眠/阻塞。

    3) [workaround-ish] 正确应用blocking 时的调度程序行为

    在这个例子中,我们使用scaladoc for scala.concurrent.blocking 方法,它可以在遇到阻塞操作时提供帮助。它通常会导致更多线程被启动以在阻塞操作中存活。

    // OK, default dispatcher but we'll use `blocking`
    implicit val dispatcher = system.dispatcher
    
    val routes: Route = post { 
      complete {
        Future { // uses the default dispatcher (it's a Fork-Join Pool)
          blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                     // but at the cost of exploding the number of threads (which eventually
                     // may also lead to starvation problems, but on a different layer)
            Thread.sleep(5000)
            System.currentTimeMillis().toString
           }
        }
      }
    }
    

    应用程序的行为如下:

    您会注意到创建了 很多 新线程,这是因为阻塞提示“哦,这将是阻塞的,所以我们需要更多线程”。这导致我们被阻塞的总时间比 1) 示例中的要小,但是在阻塞操作完成后我们有数百个线程什么都不做......当然,它们最终会被关闭(FJP 会这样做),但在一段时间内,我们将有大量(不受控制的)线程在运行,与 2) 解决方案相比,我们确切地知道有多少线程专门用于阻塞行为。

    总结:永远不要阻塞默认调度器:-)

    最佳实践是使用 2) 中显示的模式,为可用的阻塞操作提供一个调度程序,并在那里执行它们。

    讨论的 Akka HTTP 版本2.0.1

    使用的分析器:很多人私下问我这个答案,我用什么分析器来可视化上面图片中的线程状态,所以在这里添加这个信息:我使用了YourKit是一个很棒的商业分析器(OSS 免费),尽管您可以使用免费的VisualVM from OpenJDK 获得相同的结果。

    【讨论】:

    • 我们现在将此回复作为官方文档的一部分:doc.akka.io/docs/akka/2.4/scala/http/…
    • 以上链接失效。
    • 如果我想返回响应并继续在后台工作怎么办? This 似乎工作正常。
    • 是的,这很好。 Akka 流在不同的调度程序上工作。这应该是一个新的*问题,而不是劫持这个线程。
    • 在这里问似乎足够相关,但现在我创建了一个 separate question 用于超时。
    【解决方案2】:

    奇怪,但对我来说一切正常(没有阻塞)。这是代码:

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.server.Directives._
    import akka.http.scaladsl.server.Route
    import akka.stream.ActorMaterializer
    
    import scala.concurrent.Future
    
    
    object Main {
    
      implicit val system = ActorSystem()
      implicit val executor = system.dispatcher
      implicit val materializer = ActorMaterializer()
    
      val routes: Route = (post & entity(as[String])) { e =>
        complete {
          Future {
            Thread.sleep(5000)
            e
          }
        }
      } ~
        (get & path(Segment)) { r =>
          complete {
            "get"
          }
        }
    
      def main(args: Array[String]) {
    
        Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
          case e =>
            system.shutdown()
        }
      }
    }
    

    您还可以将异步代码包装到 onCompleteonSuccess 指令中:

    onComplete(Future{Thread.sleep(5000)}){e} 
    
    onSuccess(Future{Thread.sleep(5000)}){complete(e)}
    

    【讨论】:

    • 是的,这里也一样。我刚刚用 akka-http 2.0.1 测试了它
    • 您也可以尝试将 Future 包装到 onComplete/onSuccess 指令中。