【发布时间】:2017-02-24 16:49:36
【问题描述】:
如何从 Akka HTTP 路由向 Akka Sink 发送元素/消息?我的 HTTP 路由仍然需要返回正常的 HTTP 响应。
我想这需要一个流分支/连接点。正常的 HTTP 路由是来自 HttpRequest -> HttpResponse 的流。我想添加一个分支/连接,以便 HttpRequests 可以触发事件到我的单独接收器以及生成正常的 HttpResponse。
下面是一个非常简单的单路由 akka-http 应用。为简单起见,我使用了一个简单的 println 接收器。我的生产用例显然会涉及一个不那么琐碎的接收器。
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val route: akka.http.scaladsl.server.Route =
path("hello" / Segment) { p =>
get {
// I'd like to send a message to an Akka Sink as well as return an HTTP response.
complete {
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}
val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
编辑:或者在使用低级 akka-http API 时,我如何将特定消息从特定路由处理程序发送到接收器?
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
val serverSource = Http().bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
connection handleWithSyncHandler requestHandler
// this is equivalent to
// connection handleWith { Flow[HttpRequest] map requestHandler }
}).run()
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
【问题讨论】:
标签: scala akka-stream akka-http