通常在 Akka 中,您希望限制“每个单元”完成的工作量,在这种情况下,一个单元:
- 处理消息的参与者
- 在
Future/Task 或相同的回调中完成的工作
过长的工作单元很容易通过消耗一个线程来限制整个系统的响应能力。对于不消耗 CPU 但被阻塞等待 I/O 的任务,可以在不同的线程池中执行,但对于一些消耗 CPU 的工作,这并没有真正的帮助。
因此,如果您正在执行循环,那么广义的方法是将循环的状态暂停到一条消息中并将其发送给您自己。它引入了一个小的性能损失(构建消息的延迟,将其发送给自己(保证为本地发送),并且在系统空闲时解构它可能会在微秒级),但可以改善整体系统延迟。
例如,假设我们有一个演员将计算nth 斐波那契数。我正在使用 Akka Typed 来实现这一点,但广泛的原则适用于 Classic:
object Fibonacci {
sealed trait Command
case class SumOfFirstN(n: Int, replyTo: ActorRef[Option[Long]]) extends Command
private object Internal {
case class Iterate(i: Int, a: Int, b: Int) extends Command
val initialIterate = Iterate(1, 0, 1)
}
case class State(waiting: SortedMap[Int, Set[ActorRef[Option[Long]]]]) {
def behavior: Behavior[Command] =
Behaviors.receive { (context, msg) =>
msg match {
case SumOfFirstN(n, replyTo) =>
if (n < 1) {
replyTo ! None
Behaviors.same
} else {
if (waiting.isEmpty) {
context.self ! Internal.initialIterate
}
val nextWaiting =
waiting.updated(n, waiting.get(n).fold(Set(replyTo))(_.incl(replyTo))
copy(waiting = nextWaiting).behavior
}
case Internal.Iterate(i, a, b) =>
// the ith fibonacci number is b, the (i-1)th is a
if (waiting.rangeFrom(i).isEmpty) {
// Nobody waiting for this run to complete
if (waiting.nonEmpty) {
context.self ! Internal.initialIterate
}
Behaviors.same
} else {
var nextWaiting = waiting
var nextA = a
var nextB = b
(1 to 10).foreach { x =>
val next = nextA + nextB
nextWaiting.get(x + i).foreach { waiters =>
waiters.foreach(_ ! Some(next))
}
nextWaiting = nextWaiting.removed(x + i)
nextA = nextB
nextB = next
}
context.self ! Internal.Iterate(i + 10, nextA, nextB)
copy(waiting = nextWaiting)
}
}
}
}
}
请注意,针对同一数字的多个请求(如果时间足够接近)只会计算一次,而中间结果的临时关闭请求不会导致额外计算。