【问题标题】:Checking message query by actor in between calculations in scala/akka在 scala/akka 的计算之间检查参与者的消息查询
【发布时间】:2021-12-19 17:52:22
【问题描述】:

我有一个演员,当他收到一条消息时,他开始循环计算,并且他做了一段时间(就像他做了 100 次一样)。现在我需要他对可能尽快收到的其他消息做出反应。最好的方法是在他的循环中添加一些指令,例如“如果队列中有消息做出反应然后返回这里”,但我还没有看到这样的功能。

我认为演员可以向自己发送消息而不是循环,然后这些消息将在最后排队,他会在其间对其他消息做出反应,但我听说沟通很糟糕(更多时间消耗而不是计算)并且不知道与自我的交流是否如此。

我的问题是您如何看待这种解决方案,您还有其他想法如何处理计算之间的通信吗?

【问题讨论】:

    标签: scala akka actor


    【解决方案1】:

    不应在主接收方法中进行耗时的计算,因为它会降低系统的响应能力。将计算置于阻塞中 FutureTask 或其他异步对象,并在计算完成时向actor发送消息。当计算在不同的线程上继续时,actor 可以继续尽快处理消息。

    如果参与者需要在运行时(响应消息)修改计算,这会变得更加复杂,但解决方案取决于计算是什么以及需要什么样的修改,因此实际上不可能给出一个一般性的答案。

    【讨论】:

      【解决方案2】:

      通常在 Akka 中,您希望限制“每个单元”完成的工作量,在这种情况下,一个单元:

      • 处理消息的参与者
      • Future/Task 或相同的回调中完成的工作

      过长的工作单元很容易通过消耗一个线程来限制整个系统的响应能力。对于不消耗 CPU 但被阻塞等待 I/O 的任务,可以在不同的线程池中执行,但对于一些消耗 CPU 的工作,这并没有真正的帮助。

      因此,如果您正在执行循环,那么广义的方法是将循环的状态暂停到一条消息中并将其发送给您自己。它引入了一个小的性能损失(构建消息的延迟,将其发送给自己(保证为本地发送),并且在系统空闲时解构它可能会在微秒级),但可以改善整体系统延迟。

      例如,假设我们有一个演员将计算nth 斐波那契数。我正在使用 Akka Typed 来实现这一点,但广泛的原则适用于 Classic:

      object Fibonacci {
        sealed trait Command
      
        case class SumOfFirstN(n: Int, replyTo: ActorRef[Option[Long]]) extends Command
      
        private object Internal {
          case class Iterate(i: Int, a: Int, b: Int)  extends Command
          val initialIterate = Iterate(1, 0, 1)
        }
      
        case class State(waiting: SortedMap[Int, Set[ActorRef[Option[Long]]]]) {
          def behavior: Behavior[Command] =
            Behaviors.receive { (context, msg) =>
              msg match {
                case SumOfFirstN(n, replyTo) =>
                  if (n < 1) {
                    replyTo ! None
                    Behaviors.same
                  } else {
                    if (waiting.isEmpty) {
                      context.self ! Internal.initialIterate
                    }
      
                    val nextWaiting =
                      waiting.updated(n, waiting.get(n).fold(Set(replyTo))(_.incl(replyTo))
                    copy(waiting = nextWaiting).behavior
                  }
      
                case Internal.Iterate(i, a, b) =>
                  // the ith fibonacci number is b, the (i-1)th is a
                  if (waiting.rangeFrom(i).isEmpty) {
                    // Nobody waiting for this run to complete
                    if (waiting.nonEmpty) {
                      context.self ! Internal.initialIterate
                    }
      
                    Behaviors.same
                  } else {
                    var nextWaiting = waiting
                    var nextA = a
                    var nextB = b
      
                    (1 to 10).foreach { x =>
                      val next = nextA + nextB
      
                      nextWaiting.get(x + i).foreach { waiters =>
                        waiters.foreach(_ ! Some(next))
                      }
      
                      nextWaiting = nextWaiting.removed(x + i)
                      nextA = nextB
                      nextB = next
                    }
      
                    context.self ! Internal.Iterate(i + 10, nextA, nextB)
                    copy(waiting = nextWaiting)
                  }
              }
            }
        }
      }
      

      请注意,针对同一数字的多个请求(如果时间足够接近)只会计算一次,而中间结果的临时关闭请求不会导致额外计算。

      【讨论】:

      • 恕我直言,我认为这是对 Akka 可用于的系统类型的一个相当狭隘的看法。有很多应用程序需要大量不可分割的计算,但仍然可以使用 Akka 进行有效管理和控制。事实上,Akka 能够在这种环境中继续做出响应是它的强项之一。
      • 这个“解决方案”基本上是操作系统中线程调度算法的软件重新实现,几乎可以肯定比操作系统效率低,开销更大。如果您的系统需要所有内核 100% 的 CPU,那么像这样增加开销不是解决方案。如果没有,操作系统将能够足够频繁地调度 Akka 消息线程以保持响应能力,即使您有其他线程执行大量计算。
      • 您可以在主调度程序之外的线程上运行长时间的 CPU 密集型任务。我这样做的经验(将线程调度留给操作系统)并不是积极的,因为主调度程序中的线程饥饿显着增加。
      • 感觉像是您的特定操作系统/调度程序的问题,而不是一般问题,因为除非以某种方式给予饥饿的线程较低的优先级,否则您不应该遇到线程饥饿。
      • 任何时候操作系统调度一个不在主 Akka 调度程序中的线程,它都违反了 Akka 隐含的一个微妙假设,即如果调度该调度程序中的任何线程,所有这些线程都会被调度。现在,如果在该调度程序中几乎没有要做的事情,那没问题(您可以通过将主调度程序缩小另一个调度程序中预期的执行线程数来缓解这种情况),不会发生饥饿(我指的是饥饿)比线程饥饿检测器检测到的更普遍)。
      【解决方案3】:

      一个选项是委派任务,使用例如:Future,并使用一个单独的 ExecutionContext,其 fixed-pool-size(可在 application.conf 中配置)等于 CPU(或内核)的数量,以便计算使用可用的内核有效地完成。正如@Tim 所说,一旦计算完成,您可以通知主要参与者。

      另一种选择是让路由器后面的另一个参与者进行计算,同时将路由的数量限制为 CPU 的数量。

      一个简单的示例:

      object DelegatingSystem extends App {
      
        val sys = ActorSystem("DelegatingSystem")
      
        case class TimeConsuming(i: Int)
        case object Other
      
        class Worker extends Actor with ActorLogging {
          
          override def receive: Receive = {
            case message =>
              Thread.sleep(1000)
              log.info(s"$self computed long $message")
          }
        }
      
        class Delegator extends Actor with ActorLogging {
          //Set the number of routees to be equal to #of cpus
          val router: ActorRef = context.actorOf(RoundRobinPool(2).props(Props[Worker]))
      
          override def receive: Receive = {
            case message:TimeConsuming => router ! message
            case _ =>
              log.info("process other messages")
          }
        }
      
        val delegator = sys.actorOf(Props[Delegator])
        delegator ! TimeConsuming(1)
        delegator ! Other
        delegator ! TimeConsuming(2)
        delegator ! Other
        delegator ! TimeConsuming(3)
        delegator ! Other
        delegator ! TimeConsuming(4)
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-11-02
        • 2015-06-15
        • 1970-01-01
        • 2014-07-29
        • 1970-01-01
        • 2015-10-13
        • 2018-04-22
        • 2011-03-24
        相关资源
        最近更新 更多