【问题标题】:Akka: how to manage a double http request?Akka:如何管理双 http 请求?
【发布时间】:2016-08-29 14:18:43
【问题描述】:

在我的演员中,我有两个不同地址的请求,如下所示:

http.singleRequest(HttpRequest(uri = encodedUri).addHeader(Accept(MediaTypes.`application/json`)))

我需要这两个请求都返回一个值。作为正常的期货,我希望是这样的:

val response: Future[SomeData] = for {
        r1 <- firstRequest
        r2 <- secondRequest
      } yield {
        // merge the results of these two responses
      }

      response onComplete {
        case Success(body) => sndr ! Something(body)
        case Failure(message) => BadRequest(message.toString)
      }

在这部分文档中:

http://doc.akka.io/docs/akka/2.4/scala/http/client-side/request-level.html

建议使用pipeToself来管理单个请求,而不是使用原生的onComplete/map/etc

如何将它应用于多个请求,例如我需要等待 2 个或更多完成的情况?

【问题讨论】:

    标签: scala akka


    【解决方案1】:

    简单直接

    val f1 = Future { //request1 }
    
    val f2 = Future { //request2 }
    
    val resultF = f1 zip f2 
    
    resultF pipeTo self
    

    当前参与者将获得结果作为消息,消息将是一个元组(f1Result, f2Result)

    如果结果 resultF 失败,则当前参与者获取包裹在 akka.actor.Status.Failure 中的失败消息

    在方法中f1f2是独立的期货

    如果f2 取决于f1 的值,请使用flatMap

     val resultF = f1.flatMap { f1Result => createF2(f1Result) }
    
     //alternatively we can use for comprehension
    
     resultF pipeTo self
    

    示例

    import akka.actor.Actor
    import akka.actor.Status.Failure
    
    import scala.concurrent.Future
    import akka.pattern.pipe
    
    object ManagerActor {
      case object Exec
    }
    
    class ManagerActor extends Actor {
    
      import ManagerActor._
    
      implicit val dispather = context.dispatcher
    
     override def receive: Receive = {
        case Exec =>
    
        val f1 = Future { 1 }
    
        val f2 = Future { 2 }
    
        val resultF = f1 zip f2
    
        resultF pipeTo self
    
       case result: (Int, Int) => //be careful about type erasure 
         println(s"""result:$result""")
    
       case Failure(th) =>
         println(s"""msg: ${th.getMessage}""")
    
       }
    }
    

    运行

    object Main {
     def main(args: Array[String]): Unit = {
       val system = ActorSystem()
       val actor = system.actorOf(Props[ManagerActor])
       actor ! ManagerActor.Exec
       Thread.sleep(1000000)
     }
    }
    

    我们可以使用Future.sequence 代替zip 将其推广到任意数量的http 请求。

    【讨论】:

    • flatMap 可能不如map。我认为真正的问题是发件人在未来线程中的可见性。
    猜你喜欢
    • 1970-01-01
    • 2020-10-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-14
    • 1970-01-01
    • 2018-08-28
    • 2016-08-15
    相关资源
    最近更新 更多