【问题标题】:Why akka http is not emiting responses for first N requests?为什么akka http没有为前N个请求发出响应?
【发布时间】:2016-03-21 16:17:10
【问题描述】:

我正在尝试使用 akka-http 向单个主机发出 http 请求(例如“akka.io”)。问题是创建的流 (Http().cachedHostConnectionPool) 仅在发出 N 个 http 请求后才开始发出响应,其中 N 等于 max-connections。

import scala.util.Failure
import scala.util.Success
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.Uri.apply
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

object ConnectionPoolExample extends App {

  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val config = ConfigFactory.load()

  val connectionPoolSettings = ConnectionPoolSettings(config).withMaxConnections(10)
  lazy val poolClientFlow = Http().cachedHostConnectionPool[Unit]("akka.io", 80, connectionPoolSettings)

  val fakeSource = Source.fromIterator[Unit] { () => Iterator.continually { Thread.sleep(1000); () } }
  val requests = fakeSource.map { _ => println("Creating request"); HttpRequest(uri = "/") -> (()) }

  val responses = requests.via(poolClientFlow)

  responses.runForeach {
    case (tryResponse, jsonData) =>
      tryResponse match {
        case Success(httpResponse) =>
          httpResponse.entity.dataBytes.runWith(Sink.ignore)
          println(s"status: ${httpResponse.status}")
        case Failure(e) => {
          println(e)
        }
      }
  }
}

输出如下:

Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
status: 200 OK
Creating request
status: 200 OK
Creating request
status: 200 OK
...

我找不到任何配置参数,这些参数允许在响应就绪后立即发出响应,而不是在池没有可用连接时发出响应。

谢谢!

【问题讨论】:

    标签: scala akka akka-http


    【解决方案1】:

    原因是您通过调用Thread.sleep 来阻止客户端执行其他工作——该方法在响应式程序中只是被禁止的。正确且简单的方法是使用Source.tick

    【讨论】:

    • 谢谢罗兰。特定示例通过使用 Source.tick 解决。不幸的是,在这个 fakeSource 中使用了 Thread.sleep(1000)。真正的来源是从 Kafka 读取的,它是通过扩展 GraphStage[SourceShape[A]] ... val stream = consumerMap.getOrElse(topicName, List()).head setHandler(out, new OutHandler { override def onPull(): Unit = { val jsonData = JsonParser(stream.head.message()).convertTo[A] push(out, jsonData) } }) ... 来实现的... 是不是也阻塞了客户端?
    • 是的,您需要在该源上添加 .async 以将其与流的其余部分分离。我们还致力于适当的 Kafka 集成,请参阅 reactive-kafka。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-04
    • 2018-11-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多