【问题标题】:Spray route get response from child actor喷雾路线得到儿童演员的回应
【发布时间】:2015-06-15 12:35:40
【问题描述】:

我试图弄清楚如何设置一个调用适当子级的主 Actor,以支持我试图模拟 db 调用的一些喷射路由。我是 akka / spray 的新手,所以只是想更好地了解如何正确设置 spray -> actor -> db 调用(等等)。我可以从顶级演员那里得到响应,但是当我尝试从父级以下的一个演员级别得到它时,我似乎无法得到任何工作。

查看演员的路径时,从我从喷射路线进行调用的方式看来,我是从临时演员经过的。以下是我到目前为止的内容。这必须只是用户错误/无知,只是不确定如何进行。任何建议,将不胜感激。

下面的 Demo Spray Service 和 Redis Actor 代码 sn-ps 显示了我从路由调用 actor 的位置以及我遇到问题的多个 actor(希望我的路由从 SummaryActor 获得响应)。谢谢!

开机:

object Boot extends App {

  // we need an ActorSystem to host our application in
  implicit val system = ActorSystem("on-spray-can")

  // create and start our service actor
  val service = system.actorOf(Props[DemoServiceActor], "demo-service")

  implicit val timeout = Timeout(5.seconds)
  // start a new HTTP server on port 8080 with our service actor as the handler
  IO(Http) ? Http.Bind(service, interface = "localhost", port = 8080)
}

演示服务角色(用于喷涂)

class DemoServiceActor extends Actor with Api {

  // the HttpService trait defines only one abstract member, which
  // connects the services environment to the enclosing actor or test
  def actorRefFactory = context

  // this actor only runs our route, but you could add
  // other things here, like request stream processing
  // or timeout handling
  def receive = handleTimeouts orElse runRoute(route)

  //Used to watch for request timeouts
  //http://spray.io/documentation/1.1.2/spray-routing/key-concepts/timeout-handling/
  def handleTimeouts: Receive = {
    case Timedout(x: HttpRequest) =>
      sender ! HttpResponse(StatusCodes.InternalServerError, "Too late")
  }


}

//Master trait for handling large APIs
//http://stackoverflow.com/questions/14653526/can-spray-io-routes-be-split-into-multiple-controllers
trait Api extends DemoService {
  val route = {
    messageApiRouting
  }
}

演示喷涂服务(路线):

trait DemoService extends HttpService with Actor  {
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val redisActor = context.actorOf(Props[RedisActor], "redisactor")

  val messageApiRouting =
        path("summary" / Segment / Segment) { (dataset, timeslice) =>
          onComplete(getSummary(redisActor, dataset, timeslice)) {
            case Success(value) => complete(s"The result was $value")
            case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
          }
        }

  def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = Future {

    val dbMessage = DbMessage("summary", dataset + timeslice)
    val future = redisActor ? dbMessage
    val result = Await.result(future, timeout.duration).asInstanceOf[String]
    result
  }

}

Redis Actor(尚未模拟实际的 Redis 客户端)

class RedisActor extends Actor with ActorLogging {
  //  val pool = REDIS
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val summaryActor = context.actorOf(Props[SummaryActor], "summaryactor")


  def receive = {

    case msg: DbMessage => {
      msg.query match {
        case "summary" => {
          log.debug("Summary Query Request")
          log.debug(sender.path.toString)
           summaryActor ! msg
        }
      }
    }

    //If not match log an error
    case _ => log.error("Received unknown message: {} ")
  }
}

class SummaryActor extends Actor with ActorLogging{

  def receive = {
    case msg: DbMessage =>{
      log.debug("Summary Actor Received Message")
      //Send back to Spray Route

    }
  }
}

【问题讨论】:

    标签: scala akka spray


    【解决方案1】:

    您的代码的第一个问题是您需要从主actor 转发到子actor,以便sender 被正确传播并可供子actor 响应。所以改变这个(RedisActor):

    summaryActor ! msg
    

    收件人:

    summaryActor forward msg
    

    这是主要问题。修复它,您的代码应该开始工作。不过,还有其他需要注意的地方。您的getSummary 方法当前定义为:

    def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = 
      Future {
        val dbMessage = DbMessage("summary", dataset + timeslice)
        val future = redisActor ? dbMessage
        val result = Await.result(future, timeout.duration).asInstanceOf[String]
        result
      }
    

    这里的问题是ask 操作 (?) 已经返回了一个 Future,所以你正在阻止它以获取结果,将它包装在另一个 Future 中以便你可以返回FutureonComplete 使用。您应该能够通过直接使用从ask 返回的Future 来简化事情,如下所示:

    def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = {
      val dbMessage = DbMessage("summary", dataset + timeslice)
      (redisActor ? dbMessage).mapTo[String]      
    }
    

    【讨论】:

    • 非常感谢。我将看看建议的更新。谢谢!
    • 在两个帐户上都像魅力一样工作。再次感谢。
    【解决方案2】:

    只是对上述方法的重要评论。

    由于 getSummary(...) 函数返回一个 Future[String] 对象并且您在 onComplete(...) 函数中调用它,您需要导入:

    导入 ExecutionContext.Implicits.global

    这样,您将通过让 Future 在范围内拥有 ExecutionContext 声明一个隐式 ExecutionContext 参数。

    ** 如果你不这样做,你最终会得到一个不匹配的错误 因为 onComplete(...) 需要一个 onComplete Future 磁铁对象,但你给了一个 Future[String] 对象。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-05-06
      • 2015-10-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-07
      相关资源
      最近更新 更多