【问题标题】:Akka round-robin: Sending response from remote routees to senderAkka 循环:从远程路由向发送者发送响应
【发布时间】:2016-10-06 14:00:07
【问题描述】:

我正在使用 Akka 集群(版本 2.4.10),其中很少有节点被指定为“前端”角色,而其他节点很少被指定为“工作者”。工人在远程机器上。传入的工作由前端参与者通过循环路由分配给工作人员。问题是将“工人”的响应发回给前端参与者。我可以看到工作正在由工人完成。但是工人发送到前端的消息没有到达并最终成为死信。我在日志中看到以下错误。

[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered.

我见过this,我在我的代码中也遵循同样的方法。我也见过this,但建议的解决方案不适用于这种情况,因为我不知道预先的路由。它通过配置来实现,并且可以更改。轮询路由器配置如下。

akka.actor.deployment {
  /frontEnd/hm = {
    router = round-robin-group
    nr-of-instances = 5
    routees.paths = ["/user/hmWorker"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = on
    }
  }
}

路由器在前端actor中被实例化,如下所示。

val router = context.actorOf(FromConfig.props(), name = "hm")
val controller = context.actorOf(Props(classOf[Controller], router))

控制器和工人代码如下。

// Node 1 : Controller routes requests using round-robin
class Controller(router: ActorRef) extends Actor {

    val list = List("a", "b") // Assume this is a big list

    val groups = list.grouped(500)

    override def receive: Actor.Receive = {
      val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]]))
      val future = Future.sequence(futures).map(_.flatten)
      val result = Await.result(future, 50 seconds)
      println(s"Result is $result")
    }
}

// Node 2
class Worker extends Actor {

    override def receive: Actor.Receive = {
      case Message(lst) =>
            val future: Future[List[String]] = // Do Something asynchronous
            future onComplete {
                case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor.
                case Failure(th) => // Error handling
            }
    }
}

请让我知道我在这里做错了什么。感谢您的帮助。

【问题讨论】:

    标签: scala akka akka-cluster akka-remote-actor akka-remoting


    【解决方案1】:

    您不应该在Future 的回调中使用sender()。在处理回调时,sender() 所指的内容可能与您收到消息时不同。

    考虑先将引用保存在回调之外,例如:

    override def receive: Actor.Receive = {
      case Message(lst) =>
            val future: Future[List[String]] = // Do Something asynchronous
            val replyTo: ActorRef = sender()
            future onComplete {
                case Success(r) => replyTo.!(r)(context.parent) // This message is not delivered to Controller actor.
                case Failure(th) => // Error handling
            }
    }
    

    或者更好的是,使用管道模式:

    import akka.pattern.pipe
    override def receive: Actor.Receive = {
      case Message(lst) =>
        val future: Future[List[String]] = // Do Something asynchronous
        future.pipeTo(sender())
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-12-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-04-12
      • 1970-01-01
      相关资源
      最近更新 更多