【问题标题】:Akka: waiting for multiple messagesAkka:等待多条消息
【发布时间】:2013-12-07 18:02:02
【问题描述】:

嗨,akka 大师:) 你能指导我吗?

我正在尝试做的事情 - 演员 A 向演员 B 请求消息,然后等待消息返回。但是,不知何故,演员 B 给 A 的不是一条消息,而是其中的 4 条。演员 A Future 正确完成,但其余 3 条消息被视为死信。为什么?这是正确的吗?我的意思是,演员 A 有一个合适的处理程序,那为什么这些字母是死的呢? :-(

[信息] [11/22/2013 22:00:38.975] [ForkJoinPool-2-worker-7] [akka://actors/user/a] 得到结果 pong [INFO] [11/22/2013 22:00:38.976] [actors-akka.actor.default-dispatcher-4] [akka://actors/deadLetters] 消息 [java.lang.String] 来自 演员[akka://actors/user/b#-759739990] 到 演员[akka://actors/deadLetters] 未交付。 [1] 死信 遭遇。可以关闭或调整此日志记录 配置设置“akka.log-dead-letters”和 'akka.log-dead-letters-during-shutdown'。 ...同样的消息 2 次以上...

请看代码。

package head_thrash

import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main extends App {
  val system = ActorSystem("actors")

  val a = system.actorOf(Props[A], "a")
  val b = system.actorOf(Props[B], "b")

  a ! "ping"
  system.awaitTermination()
}

class A extends Actor with ActorLogging {

  implicit val timeout = Timeout(5.seconds)

  def receive = {
    case "ping" => {
      val b = context.actorSelection("../b")
      val future: Future[String] = ask(b, "ping").mapTo[String]
      future.onSuccess {
        case result: String ⇒ {
          log.info("Got result " + result) // <-- got result pong here, that's okay
        }
      }
    }
    case "pong" => {
      log.info("hmmm...")
    }
  }
}

class B extends Actor with ActorLogging {
  def receive = {
    case "ping" => {
      sender ! "pong"
      sender ! "pong" // <-- dead letter!
      sender ! "pong" // <-- dead letter!
      sender ! "pong" // <-- dead letter!
    }
  }
}

这真的让我很困惑。现在你可以问 - 嘿,伙计,为什么你需要 B 发送很多消息?好吧,这是更复杂案例的一部分——A 向 B 询问消息。 B 回答。然后 A 等待 B 的另一条消息。这里的棘手部分是 plain 在 Future 完成后等待 - 我只是无法下定决心让该模型适合Akka 基础。

但是现在,我怎样才能正确处理所有 4 条消息而没有死信?谢谢:-D

【问题讨论】:

    标签: scala akka wait actor


    【解决方案1】:

    如果您不等待消息,您会发现代码更容易编写。当您不从时间或步骤顺序的角度考虑您的应用程序时,Actor 效果最佳。只要让它能够处理回调。但是用!不是 ?。如果需要,可以使用 become 或 FSM 来显示这两种状态。

    这不是您提出的问题,但这是您想知道的。避免演员的命令式风格可以防止大多数此类错误的发生。

    【讨论】:

    • 好吧,我倾向于同意你的观点。询问模式真的很难维护。 FSM去研究,然后:-D
    • 是的,询问模式很快就会变得混乱。我认为它仅适用于特定的边缘情况。就像在测试中您必须阻止响应以使测试正常运行一样。可能是您不需要 FSM。简单的参与者可以准备好在其正常的接收块中处理回调。如果您需要它在收到该回调之前不做任何其他事情,请使用 FSM 或成为。 become 更简单,但 FSM 在需要时有更多附加组件。比如on-transitions。
    【解决方案2】:

    您的问题是演员 B 没有回答演员 A。如果我们阅读询问模式的 documentation,我们会发现 ask 创建一个临时一次性演员来接收对消息的回复并完成一个scala.concurrent.Future 与它。

    这个临时演员根本不处理 "pong" 消息,他只是在等待您随后将其作为字符串的 Future 转换的任何答案。

    如果你想解决这个问题,你必须修改你的演员 B,让它首先回答临时的“询问演员”,然后直接向演员 A 发送消息。

    class B extends Actor with ActorLogging {
      def receive = {
        case "ping" => {
          sender ! "pong"  //the sender is the temp ask actor
          val a = context.actorSelection("../a") // get a ref on actor A
          a ! "pong"
          a ! "pong"
          a ! "pong"
        }
      }
    }
    

    这不是很干净,但现在我希望你明白发生了什么。

    【讨论】:

    • 您还可以在第一条消息中包含sendAdditionaReplies: ActorRef
    猜你喜欢
    • 1970-01-01
    • 2012-09-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-27
    • 2016-08-04
    • 2021-01-31
    • 2020-07-02
    相关资源
    最近更新 更多