【问题标题】:Akka message passing timingAkka 消息传递时间
【发布时间】:2015-08-05 19:39:48
【问题描述】:

我正在使用 Scala 和 Akka 进行人工生命模拟,到目前为止,我对两者都非常满意。我在时间方面遇到了一些问题,但我无法完全解释。

目前,我的模拟中的每只动物都是一对演员(动物 + 大脑)。通常,这两个参与者轮流进行(动物将传感器输入发送到大脑,等待结果,对其采取行动并重新开始)。然而,动物时不时地需要相互交流才能互相吃掉或繁殖。

对我来说奇怪的一件事是时机。事实证明,从一种动物向另一种动物发送信息比从动物向大脑发送信息要慢很多(大约 100 倍)。与素食者和无性动物相比,这使我可怜的捕食者和性活跃的动物处于不利地位(免责声明:我自己也是素食主义者,但我认为成为素食者有更好的理由,而不是在尝试打猎时陷入困境。 .).

我提取了一个演示问题的最小代码sn-p:

package edu.blindworld.test

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

class Animal extends Actor {
  val brain = context.actorOf(Props(classOf[Brain]))
  var animals: Option[List[ActorRef]] = None

  var brainCount = 0
  var brainRequestStartTime = 0L
  var brainNanos = 0L

  var peerCount = 0
  var peerRequestStartTime = 0L
  var peerNanos = 0L

  override def receive = {
    case Go(all) =>
      animals = Some(all)
      performLoop()
    case BrainResponse =>
      brainNanos += (System.nanoTime() - brainRequestStartTime)
      brainCount += 1
      // Animal interactions are rare
      if (Random.nextDouble() < 0.01) {
        // Send a ping to a random other one (or ourselves). Defer our own loop
        val randomOther = animals.get(Random.nextInt(animals.get.length))
        peerRequestStartTime = System.nanoTime()
        randomOther ! PeerRequest
      } else {
        performLoop()
      }
    case PeerResponse =>
      peerNanos += (System.nanoTime() - peerRequestStartTime)
      peerCount += 1
      performLoop()
    case PeerRequest =>
      sender() ! PeerResponse
    case Stop =>
      sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
      context.stop(brain)
      context.stop(self)
  }

  def performLoop() = {
    brain ! BrainRequest
    brainRequestStartTime = System.nanoTime()
  }
}

class Brain extends Actor {
  override def receive = {
    case BrainRequest =>
      sender() ! BrainResponse
  }
}

case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)

case object BrainRequest
case object BrainResponse

case object PeerRequest
case object PeerResponse

object ActorTest extends App {
  println("Sampling...")
  val system = ActorSystem("Test")
  val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
  animals.foreach(_ ! Go(animals))
  Thread.sleep(5000)
  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
  val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
  val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
  val brainCount = stats.foldLeft(0)(_ + _.brainCount)
  val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
  val peerCount = stats.foldLeft(0)(_ + _.peerCount)
  val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
  println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
  println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
  system.shutdown()
}

这就是这里发生的事情:

  • 我正在创造 50 对动物/大脑演员
  • 它们都已启动并运行 5 秒
  • 每只动物都在无限循环,轮流使用它的大脑
  • 在所有运行的 1% 中,动物会向随机的其他动物发送 ping 并等待其回复。然后,它的大脑继续循环
  • 对大脑和对等体的每个请求都会进行测量,以便我们获得平均值
  • 5 秒后,一切都停止,并比较大脑请求和 ping 到同伴的时间

在我的双核 i7 上,我看到了这些数字:

大脑请求的平均时间:0.004708ms(从 21073859 个请求中采样)

对等 ping 的平均时间:0.66866 毫秒(从 211167 个请求中采样)

因此,对同伴的 ping 比对大脑的请求慢 165 倍。我一直在尝试很多事情来解决这个问题(例如优先邮箱和预热 JIT),但无法弄清楚发生了什么。有人有想法吗?

【问题讨论】:

  • 我注意到您在发送 BrainRequest 消息 之后 设置了 BrainRequestStartTime,但在发送 PeerRequest 消息之前 之前设置了 peerRequestStartTime - 你确定差异是' t 只是执行 PeerRequest 消息发送所需的时间?
  • 你可以使用PriorityMailbox
  • Shadowlands:我将 peerRequestStartTime 调低了,但没有任何区别。将消息放在某个演员的队列中似乎几乎是瞬间完成的
  • johny:我尝试继承 UnboundedPriorityMailbox 并优先考虑对等消息(请求和响应高于其他所有内容),但我无法衡量任何差异。我认为这是有道理的,因为邮箱大小不应该成为真正的问题,因为 BrainResponses 不应该累积,所以最坏的情况是任何 PeerRequest 都应该有一个 BrainResponse 在它前面。
  • 我可能错过了你的意图。System.nanoTime 稍慢。见stackoverflow.com/questions/19052316。检查TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis)是否更快。

标签: scala akka artificial-life


【解决方案1】:

我认为您应该使用询问模式来处理消息。在您的代码中,BrainRequest 被发送给大脑 actor,然后它发回 BrainResponse。问题就在这里。 BrainResponse 不是 BrainRequest 的响应。也许是之前 BrainRequest 的回应。

以下代码使用 ask 模式,perf 结果几乎相同。

package edu.blindworld.test

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random

class Animal extends Actor {
  val brain = context.actorOf(Props(classOf[Brain]))
  var animals: Option[List[ActorRef]] = None

  var brainCount = 0
  var brainRequestStartTime = 0L
  var brainNanos = 0L

  var peerCount = 0
  var peerRequestStartTime = 0L
  var peerNanos = 0L

  override def receive = {
    case Go(all) =>
      animals = Some(all)
      performLoop()
    case PeerRequest =>
      sender() ! PeerResponse
    case Stop =>
      sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
      context.stop(brain)
      context.stop(self)
  }

  def performLoop(): Unit = {
    brainRequestStartTime = System.nanoTime()
    brain.ask(BrainRequest)(10.millis) onSuccess {
      case _ =>
        brainNanos += (System.nanoTime() - brainRequestStartTime)
        brainCount += 1
        // Animal interactions are rare
        if (Random.nextDouble() < 0.01) {
          // Send a ping to a random other one (or ourselves). Defer our own loop
          val randomOther = animals.get(Random.nextInt(animals.get.length))
          peerRequestStartTime = System.nanoTime()
          randomOther.ask(PeerRequest)(10.millis) onSuccess {
            case _ =>
              peerNanos += (System.nanoTime() - peerRequestStartTime)
              peerCount += 1
              performLoop()
          }
        } else {
          performLoop()
        }
    }
  }
}

class Brain extends Actor {
  override def receive = {
    case BrainRequest =>
      sender() ! BrainResponse
  }
}

case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)

case object BrainRequest
case object BrainResponse

case object PeerRequest
case object PeerResponse

object ActorTest extends App {
  println("Sampling...")
  val system = ActorSystem("Test")
  val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
  animals.foreach(_ ! Go(animals))
  Thread.sleep(5000)
  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
  val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
  val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
  val brainCount = stats.foldLeft(0)(_ + _.brainCount)
  val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
  val peerCount = stats.foldLeft(0)(_ + _.peerCount)
  val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
  println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
  println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
  system.shutdown()
}

【讨论】:

  • 首先:非常感谢您的意见。我尝试了您的解决方案,现在确实时间相同,但是总消息吞吐量下降了 15 倍,所以我宁愿不将所有内容都切换到那个。我猜由于需要创建临时收件箱,询问模式的成本要高得多。
  • 我还验证了您关于 BrainRequests 错误关联的建议。为此,我将时间戳放入请求中并将其发送回响应中:发送:brain ! BrainRequest(System.nanoTime()) 接收:case BrainRequest(startNanos) =&gt; sender() ! BrainResponse(startNanos) 但这给了我与原始代码完全相同的时间。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-02-16
  • 1970-01-01
  • 2011-09-06
  • 2021-06-19
  • 2012-06-04
  • 2015-01-06
  • 1970-01-01
相关资源
最近更新 更多