【发布时间】:2026-02-05 19:50:02
【问题描述】:
我正在尝试以下 POC 来检查如何获得高并发
implicit def executionContext = context.system.dispatchers.lookup("async-futures-dispatcher")
implicit val timeout = 10 seconds
val contestroute = "/contestroute" {
get {
respondWithMediaType(`application/json`) {
dynamic {
onSuccess(
Future {
val start = System.currentTimeMillis()
// np here should be dealt by 200 threads defined below, so why
// overall time takes so long? why doesn't it really utilize all
// threads I have given to it? how to update the code so it
// utilizes the 200 threads?
Thread.sleep(5000)
val status = s"timediff ${System.currentTimeMillis() - start}ms ${Thread.currentThread().getName}"
status
}) { time =>
complete(s"status: $time")
}
}
}
}
}
我的配置:
async-futures-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 200
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 20.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 200
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
但是当我像这样运行 apache bench 时:
ab -n 200 -c 50 http://LAP:8080/contestroute
我得到的结果是:
Server Software: Apache-Coyote/1.1
Server Port:erred: 37500 bytes
HTML transferred: 10350 bytes
Requests per second: 4.31 [#/sec] (mean)
Time per request: 34776.278 [ms] (mean)
Time per request: 231.842 [ms] (mean, across all concurrent requests)
Transfer rate: 1.05 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 5 406 1021.3 7 3001
Processing: 30132 30466 390.8 30308 31231
Waiting: 30131 30464 391.8 30306 31231
Total: 30140 30872 998.9 30353 33228 8080
Document Path: /contestroute
Document Length: 69 bytes
Concurrency Level: 150
Time taken for tests: 34.776 seconds
Complete requests: 150
Failed requests: 0
Write errors: 0
Non-2xx responses: 150
Total transferred: 37500 bytes
HTML transferred: 10350 bytes
Requests per second: 4.31 [#/sec] (mean)
Time per request: 34776.278 [ms] (mean)
Time per request: 231.842 [ms] (mean, across all concurrent requests)
Transfer rate: 1.05 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 5 406 1021.3 7 3001
Processing: 30132 30466 390.8 30308 31231
Waiting: 30131 30464 391.8 30306 31231
Total: 30140 30872 998.9 30353 33228
我错过了什么大事吗?我需要更改什么才能让我的spray 和futures 使用我给它的所有线程?
(添加我在 tomcat servlet 3.0 之上运行)
【问题讨论】:
-
我知道这是示例代码,但为什么
sleep在作为Future执行的代码中。通过这样做,您基本上会使运行该 Future 的线程停止运行 5 秒,并且在这 5 秒内根本无法将其分配给另一个参与者/未来。 -
我正在模拟一个长时间运行的操作,所以我可以看看我是否可以利用 200 个线程来运行这个长时间运行的操作。
-
你确定 Spray 的底层 Actor 系统也没有选择
async-futures-dispatcher执行上下文吗?如果它无意中捡起它(因为它被定义为隐式),那么这可能是问题的一部分。正如您在此处所做的那样,将阻塞代码与 akka 主调度程序隔离到单独的执行上下文中是一个好主意,但如果 akka 也使用此执行上下文来运行代表此路由的参与者,则它不会达到目的。 -
我想指出,这里的整个场景是一个非常具体的用例。一般来说,拥有 200 个线程来做任何事情几乎总是错误的解决方案。在 Akka 中,“获得高并发”通常是通过将工作分解成更小的部分并将这些工作分配给 Actor 来实现的,尽可能避免阻塞。当您正确执行此操作时,不需要比一台机器上的 CPU/超线程数更多的线程。这是以最少的资源消耗获得最佳并发性的方法。
-
这也取决于你的实际瓶颈是什么。如果您的长时间运行的操作受 CPU 限制,则添加更多线程只会使情况恶化(更多线程争夺相同数量的 CPU 资源)。同样,如果您的负载受 IO 限制。在某些情况下,如果您只有一个阻塞的 DB 驱动程序,则如果您需要访问外部服务(如 DB),那么创建一个额外的线程池可能会很有用。在这种情况下,数据库本身是否能够并行处理负载,或者是否有必要在您身边设置一个队列来汇集数据库请求,这仍然是个问题。