【问题标题】:akka-http: send element to akka sink from http routeakka-http:将元素从 http 路由发送到 akka sink
【发布时间】:2017-02-24 16:49:36
【问题描述】:

如何从 Akka HTTP 路由向 Akka Sink 发送元素/消息?我的 HTTP 路由仍然需要返回正常的 HTTP 响应。

我想这需要一个流分支/连接点。正常的 HTTP 路由是来自 HttpRequest -> HttpResponse 的流。我想添加一个分支/连接,以便 HttpRequests 可以触发事件到我的单独接收器以及生成正常的 HttpResponse。

下面是一个非常简单的单路由 akka-http 应用。为简单起见,我使用了一个简单的 println 接收器。我的生产用例显然会涉及一个不那么琐碎的接收器。

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

  val route: akka.http.scaladsl.server.Route =
    path("hello" / Segment) { p =>
      get {
        // I'd like to send a message to an Akka Sink as well as return an HTTP response.
        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }

  val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
  val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

编辑:或者在使用低级 akka-http API 时,我如何将特定消息从特定路由处理程序发送到接收器?

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

  val requestHandler: HttpRequest => HttpResponse = {
    case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,
        "<html><body>Hello world!</body></html>"))

    case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "PONG!")

    case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
      sys.error("BOOM!")

    case r: HttpRequest =>
      r.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

  val serverSource = Http().bind(interface = "localhost", port = 8080)

  val bindingFuture: Future[Http.ServerBinding] =
    serverSource.to(Sink.foreach { connection =>
      println("Accepted new connection from " + connection.remoteAddress)

      connection handleWithSyncHandler requestHandler
      // this is equivalent to
      // connection handleWith { Flow[HttpRequest] map requestHandler }
    }).run()

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

【问题讨论】:

    标签: scala akka-stream akka-http


    【解决方案1】:

    如果您想将整个HttpRequest 发送到您的接收器,我想说最简单的方法是使用alsoTo 组合器。结果将类似于

    val mySink: Sink[HttpRequest, NotUsed] = ???
    
    val handlerFlow = Flow[HttpRequest].alsoTo(mySink).via(RouteResult.route2HandlerFlow(route))
    
    val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)
    

    仅供参考:alsoTo 实际上隐藏了 Broadcast 阶段。

    IF 相反,您需要有选择地从特定子路由向 Sink 发送消息,您别无选择,只能为每个传入请求实现一个新流。请参阅下面的示例

    val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
    
    val route: akka.http.scaladsl.server.Route =
      path("hello" / Segment) { p =>
        get {
    
          (extract(_.request) & extractMaterializer) { (req, mat) ⇒
            Source.single(req).runWith(sink)(mat)
    
            complete {
              s"<h1>Say hello to akka-http. p=$p</h1>"
            }
          }
        }
      }
    

    另外,请记住,您始终可以完全放弃高级 DSL,并使用 lower-level streams DSL 为您的整个路线建模。这将导致更冗长的代码 - 但会让您完全控制您的流具体化。

    编辑:下面的示例

    val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
    
    val handlerFlow =
      Flow.fromGraph(GraphDSL.create() { implicit b =>
        import GraphDSL.Implicits._
    
        val partition = b.add(Partition[HttpRequest](2, {
          case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ 0
          case _                                        ⇒ 1
        }))
        val merge = b.add(Merge[HttpResponse](2))
    
        val happyPath = Flow[HttpRequest].map{ req ⇒
          HttpResponse(entity = HttpEntity(
            ContentTypes.`text/html(UTF-8)`,
            "<html><body>Hello world!</body></html>"))
        }        
    
        val unhappyPath = Flow[HttpRequest].map{
          case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
          HttpResponse(entity = "PONG!")
    
          case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
          sys.error("BOOM!")
    
          case r: HttpRequest =>
            r.discardEntityBytes() // important to drain incoming HTTP Entity stream
            HttpResponse(404, entity = "Unknown resource!")
        }
    
        partition.out(0).alsoTo(sink) ~> happyPath   ~> merge
        partition.out(1)              ~> unhappyPath ~> merge
    
        FlowShape(partition.in, merge.out)
      })
    
    val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)
    

    【讨论】:

    • 您的建议添加将整个 HttpRequest 发送到接收器,我想要一个特定的路由来将特定消息发送到接收器。
    • 知道了,我添加了更多信息
    • 谢谢!你能告诉我如何使用低级 API 做到这一点吗?
    【解决方案2】:

    这是我使用的似乎很理想的解决方案。 Akka Http 似乎被设计为使您的路由是简单的 HttpRequest->HttpResponse 流并且不涉及任何额外的分支。

    我没有将所有内容构建到单个 Akka 流图中,而是有一个单独的 QueueSource->Sink 图,而普通的 Akka Http HttpRequest->HttpResponse 流只是根据需要将元素添加到源队列中。

    object HttpWithSinkTest {
      def buildQueueSourceGraph(): RunnableGraph[(SourceQueueWithComplete[String], Future[Done])] = {
        val annotateMessage: Flow[String, String, NotUsed] = Flow.fromFunction[String, String](s => s"got message from queue: $s")
    
        val sourceQueue = Source.queue[String](100, OverflowStrategy.dropNew)
        val sink: Sink[String, Future[Done]] = Sink.foreach(println)
        val annotatedSink = annotateMessage.toMat(sink)(Keep.right)
        val queueGraph = sourceQueue.toMat(annotatedSink)(Keep.both)
    
        queueGraph
      }
    
      def buildHttpFlow(queue: SourceQueueWithComplete[String],
                        actorSystem: ActorSystem, materializer: ActorMaterializer): Flow[HttpRequest, HttpResponse, NotUsed] = {
        implicit val actorSystemI = actorSystem
        implicit val materializerI = materializer
    
        val route: akka.http.scaladsl.server.Route =
          path("hello" / Segment) { p =>
            get {
              complete {
                queue.offer(s"got http event p=$p")
    
                s"<h1>Say hello to akka-http. p=$p</h1>"
              }
            }
          }
    
        val routeFlow = RouteResult.route2HandlerFlow(route)
    
        routeFlow
      }
    
      def main(args: Array[String]): Unit = {
        val actorSystem = ActorSystem("my-akka-http-test")
        val executor = actorSystem.dispatcher
        implicit val materializer = ActorMaterializer()(actorSystem)
    
        val (queue, _) = buildQueueSourceGraph().run()(materializer)
    
        val httpFlow = buildHttpFlow(queue, actorSystem, materializer)
        val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
        val bindingFuture = httpExt.bindAndHandle(httpFlow, "localhost", 8080)
    
        println("Server online at http://localhost:8080/")
        println("Press RETURN to stop...")
        scala.io.StdIn.readLine()
    
        println("Shutting down...")
    
        val serverBinding = Await.result(bindingFuture, Duration.Inf)
        Await.result(serverBinding.unbind(), Duration.Inf)
        Await.result(actorSystem.terminate(), Duration.Inf)
    
        println("Done. Exiting")
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-12-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-17
      • 2015-12-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多