【发布时间】:2013-09-05 22:12:54
【问题描述】:
我使用 Akka 已经有一段时间了,而且在更简单的项目中,我只使用了 Scala 2.10 期货。然而,前几天我不得不将返回 Futures 的库与 Akka Actors 混合在一起,我不知道如何集成 Actor 调度系统和失控的 futures。
在项目中使用 Akka Actor 的选择是能够微调 Actor 队列并控制代码的并行化,并且可能最终横向扩展(即使现在它不是优先级)。但是,如果我有这样的代码(故意简化):
package externallib
object DoSomething {
def foo(): Future[Something] = ....
}
package myapp
class MyActor extends Actor {
def receive = {
case "message" => externallib.DoSomething.foo() pipeTo sender
}
}
.../
val actorRef = system.actorOf(Props[MyActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)))
(0 to 20000) foreach {
val futureSomething = actorRef ? "message"
}
那么 MyActor 实际上并没有做任何事情,除了在 externallib 中生成一个线程并直接返回。 externallib 未来最终会将结果通过管道传递给参与者的调用者。
但是,通过这种方式,管理 Actor 的受良好控制的路由器并没有真正控制生成的线程,因为它们是在 Actor 系统之外生成的,即使在同一个 ExecutionContext 中也是如此。在大循环的示例中,这意味着不是将消息排队发送给参与者,而是将快速消耗这些消息并在任何严格控制的队列之外生成 20000 个线程。
我在想我可以做这样的事情:
class MyActor extends Actor {
def receive = {
case "message" =>
val res = Await.result(externallib.DoSomething.foo(), someDuration)
sender ! res
}
}
这将确保在 externallib 完成(或超时)之前不会向参与者发送新的 "message"。但是,这实际上可能需要两个线程(演员的一个和 DoSomething 中的一个)来等待单个计算。
有没有更好的方法来控制这些在参与者系统之外产生的期货?
【问题讨论】:
-
DoSomething 使用的是什么 ExecutionContext?
-
假设 lib 以我们可以指定执行上下文的方式实现,并且参与者提供调度程序执行上下文。除非有更好的方法;)
-
你可以创建一个 ExecutionContext,它的 execute 方法只是将 runnable 发送给 Actor(reportFailure 委托给 Actor 的调度程序的 reportFailure),然后 Actor 可以接收 Runnable 并执行它们。
标签: multithreading scala threadpool akka future