【问题标题】:Akka Streams with Akka HTTP Server and ClientAkka Streams 与 Akka HTTP 服务器和客户端
【发布时间】:2015-12-14 05:37:29
【问题描述】:

我正在尝试在我的 Akka Http Server 上创建一个端点,它使用外部服务告诉用户它的 IP 地址(我知道这可以更容易地执行,但我这样做是一个挑战)。

最上层不使用流的代码是这样的:

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val requestHandler: HttpRequest => Future[HttpResponse] = {
  case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
    Http().singleRequest(HttpRequest(GET, Uri("http://checkip.amazonaws.com/"))).flatMap { response =>
      response.entity.dataBytes.runFold(ByteString(""))(_ ++ _) map { string =>
        HttpResponse(entity = HttpEntity(MediaTypes.`text/html`,
          "<html><body><h1>" + string.utf8String + "</h1></body></html>"))
      }
    }

  case _: HttpRequest =>
    Future(HttpResponse(404, entity = "Unknown resource!"))
}

Http().bindAndHandleAsync(requestHandler, "localhost", 8080)

它工作正常。然而,作为一个挑战,我想限制自己只使用流(没有Future's)。

这是我认为我会用于这种方法的布局: Source[Request] -&gt; Flow[Request, Request] -&gt; Flow[Request, Response] -&gt;Flow[Response, Response] 并适应 404 路线,还有 Source[Request] -&gt; Flow[Request, Response]。现在,如果我的 Akka Stream 知识对我有用,我需要使用 Flow.fromGraph 来处理这样的事情,但是,这就是我卡住的地方。

Future 中,我可以为各种端点制作一个简单的地图和平面地图,但在流中这意味着将流分成多个流,我不太确定该怎么做。我考虑过使用 UnzipWith 和 Options 或通用广播。

对于这个主题的任何帮助将不胜感激。


如果这有必要,我不知道? -- http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html

【问题讨论】:

  • 提出一个人为的挑战然后期望别人解决它的挑战在哪里? :)
  • @pvg Touché 哈哈。我将删除我的问题并提出一个新问题,我有兴趣将一个流拆分为多个流,因为这在文档中没有很好的记录(我怀疑这个功能是否会违背流,但我不确定)。

标签: scala akka akka-stream akka-http


【解决方案1】:

您不需要使用Flow.fromGraph。相反,使用 flatMapConcat 的单个 Flow 将起作用:

//an outgoing connection flow
val checkIPFlow = Http().outgoingConnection("checkip.amazonaws.com")

//converts the final html String to an HttpResponse
def byteStrToResponse(byteStr : ByteString) = 
  HttpResponse(entity = new Default(MediaTypes.`text/html`,
                                    byteStr.length,
                                    Source.single(byteStr)))

val reqResponseFlow = Flow[HttpRequest].flatMapConcat[HttpResponse]( _ match {
  case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
    Source.single(HttpRequest(GET, Uri("http://checkip.amazonaws.com/")))
          .via(checkIPFlow)
          .mapAsync(1)(_.entity.dataBytes.runFold(ByteString(""))(_ ++ _))
          .map("<html><body><h1>" + _.utf8String + "</h1></body></html>")
          .map(ByteString.apply)
          .map(byteStrToResponse)

  case _ =>
    Source.single(HttpResponse(404, entity = "Unknown resource!"))    
})

然后可以使用此流程绑定到传入请求:

Http().bindAndHandle(reqResponseFlow, "localhost", 8080)

而且都没有期货...

【讨论】:

  • 没想到有源代码的类似 flatMap 的解决方案。接受为答案!
  • @MartijnR 我很高兴我能对这个问题提出不同的看法。快乐的黑客攻击!
  • @RamonJRomeroyVigil 你能看看这个问题吗? stackoverflow.com/questions/36776762/…
猜你喜欢
  • 1970-01-01
  • 2018-10-24
  • 2019-03-04
  • 2019-04-11
  • 1970-01-01
  • 2018-05-25
  • 2018-02-06
  • 1970-01-01
  • 2021-10-18
相关资源
最近更新 更多