【问题标题】:How to achieve high concurrency with spray.io in this Future and Thread.sleep example?在这个 Future 和 Thread.sleep 示例中,如何使用 spray.io 实现高并发?
【发布时间】:2026-02-05 19:50:02
【问题描述】:

我正在尝试以下 POC 来检查如何获得高并发

  implicit def executionContext = context.system.dispatchers.lookup("async-futures-dispatcher")
  implicit val timeout = 10 seconds


  val contestroute = "/contestroute" {
      get {
          respondWithMediaType(`application/json`) {
            dynamic {
                onSuccess(
                  Future {
                    val start = System.currentTimeMillis()
                    // np here should be dealt by 200 threads defined below, so why 
                    // overall time takes so long? why doesn't it really  utilize all
                    // threads I have given to it? how to update the code so it 
                    // utilizes the 200 threads?
                    Thread.sleep(5000) 
                    val status = s"timediff ${System.currentTimeMillis() - start}ms ${Thread.currentThread().getName}"
                    status
                  })  { time =>
                  complete(s"status: $time")
                }
            }
          }
      }
  }

我的配置:

async-futures-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "thread-pool-executor"
  # Configuration for the thread pool
  thread-pool-executor {
    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 200
    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 20.0
    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 200
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

但是当我像这样运行 apache bench 时:

ab -n 200 -c 50 http://LAP:8080/contestroute

我得到的结果是:

Server Software:        Apache-Coyote/1.1
Server Port:erred:      37500 bytes
HTML transferred:       10350 bytes
Requests per second:    4.31 [#/sec] (mean)
Time per request:       34776.278 [ms] (mean)
Time per request:       231.842 [ms] (mean, across all concurrent requests)
Transfer rate:          1.05 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        5  406 1021.3      7    3001
Processing: 30132 30466 390.8  30308   31231
Waiting:    30131 30464 391.8  30306   31231
Total:      30140 30872 998.9  30353   33228            8080

Document Path:          /contestroute
Document Length:        69 bytes

Concurrency Level:      150
Time taken for tests:   34.776 seconds
Complete requests:      150
Failed requests:        0
Write errors:           0
Non-2xx responses:      150
Total transferred:      37500 bytes
HTML transferred:       10350 bytes
Requests per second:    4.31 [#/sec] (mean)
Time per request:       34776.278 [ms] (mean)
Time per request:       231.842 [ms] (mean, across all concurrent requests)
Transfer rate:          1.05 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        5  406 1021.3      7    3001
Processing: 30132 30466 390.8  30308   31231
Waiting:    30131 30464 391.8  30306   31231
Total:      30140 30872 998.9  30353   33228

我错过了什么大事吗?我需要更改什么才能让我的sprayfutures 使用我给它的所有线程?

(添加我在 tomcat servlet 3.0 之上运行)

【问题讨论】:

  • 我知道这是示例代码,但为什么 sleep 在作为 Future 执行的代码中。通过这样做,您基本上会使运行该 Future 的线程停止运行 5 秒,并且在这 5 秒内根本无法将其分配给另一个参与者/未来。
  • 我正在模拟一个长时间运行的操作,所以我可以看看我是否可以利用 200 个线程来运行这个长时间运行的操作。
  • 你确定 Spray 的底层 Actor 系统也没有选择 async-futures-dispatcher 执行上下文吗?如果它无意中捡起它(因为它被定义为隐式),那么这可能是问题的一部分。正如您在此处所做的那样,将阻塞代码与 akka 主调度程序隔离到单独的执行上下文中是一个好主意,但如果 akka 也使用此执行上下文来运行代表此路由的参与者,则它不会达到目的。
  • 我想指出,这里的整个场景是一个非常具体的用例。一般来说,拥有 200 个线程来做任何事情几乎总是错误的解决方案。在 Akka 中,“获得高并发”通常是通过将工作分解成更小的部分并将这些工作分配给 Actor 来实现的,尽可能避免阻塞。当您正确执行此操作时,不需要比一台机器上的 CPU/超线程数更多的线程。这是以最少的资源消耗获得最佳并发性的方法。
  • 这也取决于你的实际瓶颈是什么。如果您的长时间运行的操作受 CPU 限制,则添加更多线程只会使情况恶化(更多线程争夺相同数量的 CPU 资源)。同样,如果您的负载受 IO 限制。在某些情况下,如果您只有一个阻塞的 DB 驱动程序,则如果您需要访问外部服务(如 DB),那么创建一个额外的线程池可能会很有用。在这种情况下,数据库本身是否能够并行处理负载,或者是否有必要在您身边设置一个队列来汇集数据库请求,这仍然是个问题。

标签: scala akka spray


【解决方案1】:

在您的示例中,所有喷射操作和阻塞操作都发生在同一上下文中。您需要拆分 2 个上下文:

我也看不出使用动态的原因,我想“完整”应该是好的。

 implicit val timeout = 10.seconds

  // Execution Context for blocking ops
  val blockingExecutionContext = {
    ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2000))
  }

  // Execution Context for Spray
  import context.dispatcher

  override def receive: Receive = runRoute(contestroute)

  val contestroute = path("contestroute") {
    get {

        complete {
          Future.apply {
            val start = System.currentTimeMillis()
            // np here should be dealt by 200 threads defined below, so why
            // overall time takes so long? why doesn't it really  utilize all
            // threads I have given to it? how to update the code so it
            // utilizes the 200 threads?
            Thread.sleep(5000)
            val status = s"timediff ${System.currentTimeMillis() - start}ms ${Thread.currentThread().getName}"
            status
          }(blockingExecutionContext)
        }

    }
  }

之后您可以使用它进行测试

ab -n 200 -c 200 http://LAP:8080/contestroute

你会看到 Spray 会创建所有 200 个线程用于阻塞操作

结果:

Concurrency Level:      200
Time taken for tests:   5.096 seconds

【讨论】:

  • 小修正:“创建所有 200 个线程”不是喷雾,而是 Scala/Akka 的一个特性。
  • 这和我在评论中所说的差不多,spray 也使用了发生阻塞的上下文,因此严重限制了系统的吞吐量。
  • 这种方法不是意味着单线程仍在处理所有路由(=瓶颈+阻塞所有请求的持续危险)吗?为了实现高并发,服务器不应该尽快将传入请求的管理交给一个新线程吗?