【问题标题】:akka sending a closure to remote actor + Error in sending messages (case class) between Scala Akka remote actorsakka 向远程参与者发送闭包 + 在 Scala Akka 远程参与者之间发送消息(案例类)时出错
【发布时间】:2017-03-12 16:51:24
【问题描述】:

新查询:我试图将 DSum() 作为参数从 localActor 传递给 RemoteActor,DSum() 将在远程节点进行一些计算。我无法将其发送给 RemoteActor。有可能吗?(代码如下)

完成:我正在尝试连接远程参与者和本地参与者,并尝试使用案例类发送对象,但是从 localActor 调用时无法获取远程参与者的消息类( Common.Message(msg) ) ,而是得到 "case _ => println("Received unknown msg from local ")"

 1.package.scala

package object check {
    trait Context
    case object Start
    case class Message(msg: String)
    case class CxtDA(cxtA: List[CxtA])
    case class RCxt(var cxtA: List[CxtA], var cxtB: List[CxtB], var v1: Int, var v2: String) extends Context
    case class CxtA(var cxtC: List[CxtC], var v1: Int) extends Context
    case class CxtB(var cxtC: List[CxtC], var v1: Int) extends Context
    case class CxtC(var v1: String, var v2: Int) extends Context
    case class Task(var t1: DSum()) 

}


2. Remote Actor

package com.akka.remote

import java.io.File

import akka.actor._
import com.typesafe.config.ConfigFactory
import check._

/**
 * Remote actor which listens on port 5150
 */


class RemoteActor extends Actor {

    override def toString: String = {
        return "You printed the Local";
    }
    def receive = {
    case msg: String => {
      println("remote received " + msg + " from " + sender)
      sender ! "hi"
    }
    case Message(msg) =>
        println("RemoteActor received message "+ msg)
        sender ! Message("Hello from server")

    case CxtDA(cxtA) =>
        println("cxtA "+ cxtA)

    case Task(taskA) =>
            println ("recieved closure")


    case _ => println("unknown msg")
  }
}



object RemoteActor{


   def main(args: Array[String]) {
     //get the configuration file from classpath
    val configFile = getClass.getClassLoader.getResource("remote_application.conf").getFile
    // //parse the config
    val config = ConfigFactory.parseFile(new File(configFile))
    // //create an actor system with that config
    val system = ActorSystem("RemoteSystem" , config)
    // //create a remote actor from actorSystem
     val remoteActor = system.actorOf(Props[RemoteActor], name="remote")
     println("remote is ready")
    remoteActor ! Message("Hello from active remote")

   }
 }


3.Local Actor

package com.akka.local

import java.io.File

import akka.actor.{Props, Actor, ActorSystem}
import com.typesafe.config.ConfigFactory
import check._
import scala.util.Random

/**
 * Local actor which listens on any free port
 */
trait CxtTask {
    type CxtT <: Context
    def work(ctx: CxtT): CxtT
}


class DSum extends CxtTask with Serializable{
  override type CxtT = CxtA
    def work(ctx: CxtA): CxtA = {
    val sum = ctx.cxtC.foldRight(0)((v, acc) => v.v2 + acc)
    ctx.cxtC= List()
    ctx.v1 = sum
    println("ctx: " + ctx)
    ctx

  }
}

class LocalActor extends Actor{ 
    // import Common._

  @throws[Exception](classOf[Exception])
    val  remoteActor = context.actorSelection("akka.tcp://RemoteSystem@127.0.0.1:5150/user/remote")
    println("That 's remote:" + remoteActor)
    remoteActor ! "hi"
    var counter = 0  

    override def toString: String = {
        return "You printed the Local";
    }

  def receive = {   

    case msg:String => {
      println("got message from remote" + msg)
    }
    case Start =>
        println("inside Start.local "+ remoteActor)
        remoteActor ! Message("Hello from the LocalActor")


    case Message(msg) =>
         println("LocalActor received message: "+ msg)
        if (counter < 5) {
            sender ! Message("Hello back to you")
            counter += 1
        }

    case CxtDA(cxtA) =>
            remoteActor ! CxtDA(cxtA)

    case Task(t1) =>
            remoteActor ! Task(t1)


  }
}


 object LocalActor {

   def main(args: Array[String]) {

    val configFile = getClass.getClassLoader.getResource("local_application.conf").getFile
    val config = ConfigFactory.parseFile(new File(configFile))
    val system = ActorSystem("ClientSystem",config)
    val localActor = system.actorOf(Props[LocalActor], name="local")
    localActor ! Start

    def createRndCxtC(count: Int):List[CxtC] = (for (i <- 1 to count) yield CxtC(Random.nextString(5), 3)).toList

    def createRndCxtB(count: Int): List[CxtB] = (for (i <- 1 to count) yield CxtB(createRndCxtC(count), Random.nextInt())).toList

    def createRndCxtA(count: Int): List[CxtA] = (for (i <- 1 to count) yield CxtA(createRndCxtC(count), Random.nextInt())).toList

    val tree = RCxt(createRndCxtA(2),createRndCxtB(2),1,"")
    val workA = new DSum()
    tree.cxtA.foreach(ctxa =>workA.work(ctxa))
    localActor ! Task(new DSum())
  }
}

[Remote actor output][1]


  [1]: https://i.stack.imgur.com/mtmvU.jpg

【问题讨论】:

  • 您是否尝试过打印“_”。它显示了什么?
  • 好吧,我现在试过了,但它无法为 RemoteActor.scala 编译“case _ => println(_)”
  • 您不能使用通配符本身打印消息(通配符用于替换未使用的内容,因此您与它的用例相矛盾)。只需输入case msg =&gt; println(msg)

标签: scala akka actor


【解决方案1】:

这里的关键是您为每个参与者定义了两个不同的协议:

  • 位于RemoteActor.scala 文件中的Common 对象
  • 位于LocalActor.scala 文件中的Common 对象

因此,当在本地 Actor 中发送 Common.Message 时,您基本上是在创建与来自远程 Actor 的 Common.Message 类型不同的消息。因此,actor 无法处理它。

作为 Akka 中的一个很好的实践,只要一个参与者有一个特定的消息协议,就应该在它的伴生对象中定义它。但是,如果您有多个共享相同协议的参与者(它们的行为是通过处理这些类型的消息来定义的),那么您应该将该协议放在一个对象中并从您的参与者中导入它。

我希望这会有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-31
    • 1970-01-01
    • 1970-01-01
    • 2018-12-10
    • 2013-09-04
    • 2015-06-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多