【问题标题】:Play Framework: What happens when requests exceeds the available threadsPlay Framework:当请求超过可用线程时会发生什么
【发布时间】:2014-08-26 01:44:21
【问题描述】:

我在线程池服务阻塞请求中有一个线程。

  def sync = Action {
    import Contexts.blockingPool
    Future { 
        Thread.sleep(100)
    } 
    Ok("Done")
  }

在 Contexts.blockingPool 中配置为:

custom-pool {
    fork-join-executor {
            parallelism-min = 1
            parallelism-max = 1
    }
}

理论上,如果上述请求同时收到 100 个请求,预期的行为应该是:1 个请求应该休眠(100),其余 99 个请求应该被拒绝(或排队直到超时?)。但是我观察到额外的工作线程被创建来服务其余的请求。我还观察到,随着池中的线程数小于接收到的请求数,延迟会增加(服务请求变慢)。

如果收到的请求大于配置的线程池大小,预期会发生什么行为?

【问题讨论】:

    标签: multithreading scala playframework-2.0 threadpool


    【解决方案1】:

    您的测试结构不正确,无法测试您的假设。 如果您查看this section in the docs,您会看到 Play 有一些线程池/执行上下文。对于您的问题,重要的是默认线程池以及它与您的操作所服务的 HTTP 请求有何关系。

    正如文档所述,默认线程池是所有应用程序代码默认运行的地方。 IE。所有动作代码,包括所有Future's(没有明确定义自己的执行上下文),都将在这个执行上下文/线程池中运行。所以用你的例子:

    def sync = Action {
    
      // *** import Contexts.blockingPool
      // *** Future { 
      // *** Thread.sleep(100)
      // ***} 
    
      Ok("Done")
    }
    

    // *** 注释的操作中的所有代码都将在默认线程池中运行。 IE。当请求被路由到您的操作时:

    1. FutureThread.sleep 将被分派到您的自定义执行上下文
    2. 然后无需等待 Future 完成(因为它在自己的线程池 [Context.blockingPool] 中运行,因此不会阻塞默认线程池上的任何线程)
    3. 您的 Ok("Done") 语句被评估并且客户端收到响应
    4. 大约。收到响应 100 毫秒后,您的 Future 完成

    因此,为了解释您的观察,当您同时发送 100 个请求时,Play 将很乐意接受这些请求,路由到您的控制器操作(在 默认线程池上执行),发送到您的 Future然后回复客户端。

    默认池的默认大小为

    play {
      akka {
        ...
        actor {
          default-dispatcher = {
            fork-join-executor {
              parallelism-factor = 1.0
              parallelism-max = 24
            }
          }
        }
      }
    }
    

    每个内核使用 1 个线程,最多 24 个。 鉴于您的操作很少(不包括Future),您将能够毫不费力地处理每秒 1000 个请求。但是,您的 Future 将需要更长的时间来处理积压,因为您阻塞了自定义池中的唯一线程 (blockingPool)。

    如果您使用我对您的操作稍作调整的版本,您将在日志输出中看到确认上述说明的内容:

    object Threading {
    
      def sync = Action {
        val defaultThreadPool = Thread.currentThread().getName;
    
        import Contexts.blockingPool
        Future {
          val blockingPool = Thread.currentThread().getName;
          Logger.debug(s"""\t>>> Done on thread: $blockingPool""")
          Thread.sleep(100)
        }
    
        Logger.debug(s"""Done on thread: $defaultThreadPool""")
        Results.Ok
      }
    }
    
    object Contexts {
      implicit val blockingPool: ExecutionContext = Akka.system.dispatchers.lookup("blocking-pool-context")
    }
    

    您的所有请求都会先得到迅速处理,然后您的Future会一个一个完成。

    总之,如果你真的想测试 Play 将如何处理多个并发请求,而只有一个线程处理请求,那么你可以使用以下配置:

    play {
      akka {
        akka.loggers = ["akka.event.Logging$DefaultLogger", "akka.event.slf4j.Slf4jLogger"]
        loglevel = WARNING
        actor {
          default-dispatcher = {
            fork-join-executor {
              parallelism-min = 1
              parallelism-max = 1
            }
          }
        }
      }
    }
    

    您可能还想像这样在您的操作中添加Thread.sleep(以减慢默认线程池孤独线程的速度)

        ...
        Thread.sleep(100)
        Logger.debug(s"""<<< Done on thread: $defaultThreadPool""")
        Results.Ok
    }
    

    现在您将有 1 个线程用于请求,1 个线程用于您的 Future's。 如果您在高并发连接下运行此程序,您会注意到客户端阻塞,而 Play 会一一处理请求。这是你期望看到的......

    【讨论】:

    • 非常详细且有据可查的答案!谢谢!!
    【解决方案2】:

    Play 使用 AkkaForkJoinPool 扩展 scala.concurrent.forkjoin.ForkJoinPool。 它有内部任务队列。 您可能还会发现这个描述对于通过 fork-join-pool 处理阻塞代码很有趣:Scala: the global ExecutionContext makes your life easier

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-06-15
      • 2018-07-21
      • 2017-09-03
      • 2012-06-24
      • 2018-10-25
      • 1970-01-01
      • 1970-01-01
      • 2017-02-14
      相关资源
      最近更新 更多