【问题标题】:Get whole HttpResponse body as a String with Akka-Streams HTTP使用 Akka-Streams HTTP 将整个 HttpResponse 主体作为字符串获取
【发布时间】:2015-10-10 12:58:02
【问题描述】:

我正在尝试了解如何使用新的akka.http 库。我想向服务器发送一个 http 请求并将整个响应正文作为单个字符串读取,以生成 Source[String,?]

这是迄今为止我能够产生的最佳解决方案:

 def get(
   modelID: String,
   pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]
 ): Source[String,Unit] = {
   val uri = reactionsURL(modelID)
   val req = HttpRequest(uri = uri)
   Source.single( (req,0) )
     .via( pool )
     .map { 
       case (Success(resp),_) =>
         resp.entity.dataBytes.map( _.decodeString("utf-8") )
     }.flatten(FlattenStrategy.concat)
     .grouped( 1024 )
     .map( _.mkString )

它似乎工作得很好(除了缺少的错误路径),但对于这样简单的任务来说有点笨拙。有更聪明的解决方案吗?我可以避免grouped/mkString 吗?

【问题讨论】:

    标签: scala http akka akka-stream


    【解决方案1】:

    您可以使用 HttpResponse 的 toStrict 方法超时。它将整个答案收集为 Future。

    def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: Materializer): Future[Strict] 返回一个可共享和可序列化的

    带有严格实体的此消息的副本。

    例子:

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.{HttpResponse, HttpRequest}
    import akka.stream.{Materializer, ActorMaterializer}
    import akka.stream.scaladsl.{Sink, Flow, Source}
    import scala.concurrent.{ExecutionContext, Future}
    import scala.concurrent.duration._
    
    import scala.util.{Try, Success}
    
    object Main extends App {
    
      implicit val system = ActorSystem()
    
      import system.dispatcher
    
      implicit val materializer = ActorMaterializer()
    
      val host = "127.0.0.1"
      lazy val pool = Http().newHostConnectionPool[Int](host, 9000)
    
      FlowBuilder.get("/path", pool).to(Sink.foreach(_.foreach(println))).run()
    
    }
    
    object FlowBuilder {
      def get(modelID: String, pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool])
             (implicit ec: ExecutionContext, mat: Materializer): Source[Future[String], Unit] = {
        val uri = modelID
        val req = HttpRequest(uri = modelID)
        Source.single((req, 0)).via(pool)
          .map { 
          case (Success(resp), _) => resp.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8")) 
        }
      }
    }
    

    【讨论】:

    • 好的。但我需要一个Source[String,Unit],因为我有进一步的转换要应用。使用您的解决方案,我将需要传递 Materializer 和 ExecutionContext,然后将所有内容重新包装在一个新的 Source 中......这不是更笨重还是我错过了什么?
    • @paradigmatic 我添加示例。 ExecutionContext 和 materializer 可以设置为隐式参数。
    【解决方案2】:

    您可以使用Unmarshall,它也适用于其他类型,例如来自 Spray-json 的 json。这也是strict 返回Future[_]

    例子:

      authedReq.via(authServerReqResFlow).mapAsync(1) { case (tryRes, _) =>
          tryRes match {
            case Failure(exception) => Future.failed[Principal](exception)
            case Success(response @ HttpResponse(StatusCodes.OK,_,_,_)) =>
              val userContext = Unmarshal(response).to[UserContextData]
              userContext.map {
                case UserContextData(UserInfo(_, userName, fullName, email, title), _, _) =>
                  Principal(userName, fullName, email, title)
              }
            case Success(response @ HttpResponse(responseCode,_,entity,_)) =>
              Unmarshal(entity).to[String].flatMap(msg => Future.failed(new AuthenticationFailure(s"$responseCode\n$msg")))
          }
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-01-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-02-06
      • 2015-08-30
      • 2022-01-15
      相关资源
      最近更新 更多