【问题标题】:Akka-http process requests with StreamAkka-http 使用 Stream 处理请求
【发布时间】:2015-11-04 21:11:40
【问题描述】:

我尝试编写一些简单的基于 akka-http 和 akka-streams 的应用程序来处理 http 请求,总是使用一个预编译流,因为我计划在我的 requestProcessor 流中使用带有背压的长时间处理

我的申请代码:

import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.{Sink, Source}

import scala.annotation.tailrec
import scala.concurrent.Future

object UserRegisterSource {
  def props: Props = Props[UserRegisterSource]

  final case class RegisterUser(username: String)

}

class UserRegisterSource extends ActorPublisher[UserRegisterSource.RegisterUser] {

  import UserRegisterSource._
  import akka.stream.actor.ActorPublisherMessage._

  val MaxBufferSize = 100
  var buf = Vector.empty[RegisterUser]

  override def receive: Receive = {
    case request: RegisterUser =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(request)
      else {
        buf :+= request
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

object Main extends App {
  val host = "127.0.0.1"
  val port = 8094

  implicit val system = ActorSystem("my-testing-system")
  implicit val fm = ActorFlowMaterializer()
  implicit val executionContext = system.dispatcher

  val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = host, port = port)

  val mySource = Source.actorPublisher[UserRegisterSource.RegisterUser](UserRegisterSource.props)
  val requestProcessor = mySource
    .mapAsync(1)(fakeSaveUserAndReturnCreatedUserId)
    .to(Sink.head[Int])
    .run()

  val route: Route =
    get {
      path("test") {
        parameter('test) { case t: String =>
          requestProcessor ! UserRegisterSource.RegisterUser(t)

          ???
        }
      }
    }

  def fakeSaveUserAndReturnCreatedUserId(param: UserRegisterSource.RegisterUser): Future[Int] =
    Future.successful {
      1
    }

  serverSource.to(Sink.foreach {
    connection =>
      connection handleWith Route.handlerFlow(route)
  }).run()
}

我找到了有关如何创建可以动态接受要处理的新项目的 Source 的解决方案,但我可以找到任何解决方案,说明如何在我的路由中获取流执行的结果

【问题讨论】:

标签: scala akka akka-stream akka-http


【解决方案1】:

您问题的直接答案是为每个 HttpRequest 实现一个新 Stream 并使用 Sink.head 来获取您正在寻找的值。修改你的代码:

val requestStream = 
  mySource.map(fakeSaveUserAndReturnCreatedUserId)
          .to(Sink.head[Int]) 
          //.run() - don't materialize here

val route: Route =
  get {
    path("test") {
      parameter('test) { case t: String =>
        //materialize a new Stream here
        val userIdFut : Future[Int] = requestStream.run()

        requestProcessor ! UserRegisterSource.RegisterUser(t)

        //get the result of the Stream
        userIdFut onSuccess { case userId : Int => ...}
      }
    }
  }

但是,我认为您的问题不恰当。在您的代码示例中,您使用 akka Stream 的唯一目的是创建一个新的 UserId。 Futures 可以轻松解决这个问题,而无需物化 Stream(以及所有随附的 overhead):

val route: Route =
  get {
    path("test") {
      parameter('test) { case t: String =>
        val user = RegisterUser(t)

        fakeSaveUserAndReturnCreatedUserId(user) onSuccess { case userId : Int =>
          ...
        }
      }
    }
  }

如果您想限制对fakeSaveUserAndReturnCreateUserId 的并发调用数,那么您可以创建一个具有定义的线程池大小的ExecutionContext,如this question 的答案中所述,并使用该 ExecutionContext 创建期货:

val ThreadCount = 10 //concurrent queries

val limitedExecutionContext =
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ThreadCount))

def fakeSaveUserAndReturnCreatedUserId(param: UserRegisterSource.RegisterUser): Future[Int] =
Future { 1 }(limitedExecutionContext)

【讨论】:

  • 您好,如果正确理解调用requestStream.run(),将不会使用akka-streams 的每个请求的背压机制,因为每次运行都会在不同的流实例中进行?
  • 没有其他解决方案。您不能将多个消费者(接收器)连接到同一个 Source,因为它们每个都会以不同的速率处理数据,因此具有独特的背压。您唯一需要背压的是限制并发 UserId 创建的数量,这可以使用有限的 ExecutionContext 来完成,如答案中所述。
猜你喜欢
  • 1970-01-01
  • 2016-08-15
  • 2017-01-19
  • 2018-08-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-29
  • 2020-05-10
相关资源
最近更新 更多