【问题标题】:akka stream http rate limitakka 流 http 速率限制
【发布时间】:2016-11-28 11:26:00
【问题描述】:

我的计算图的一个阶段是类型流 Flow[Seq[Request], Seq[Response], NotUsed]。 显然,这个阶段应该为每个请求分配一个响应,并在所有请求都解决后发出 seq。

现在,底层 API 有严格的速率限制策略,所以我每秒只能触发一个请求。如果我有一个Flow 的单个Requests,我可以zip 这个流有一个每秒发出一个元素的流(How to limit an Akka Stream to execute and send down one message only once per second?),但在这种情况下我看不到类似的解决方案。

有没有很好的表达方式?我想到的想法是使用低级别的 Graph DSL 并在那里有一个一秒滴答的流作为状态,并使用它来处理请求的序列,但我怀疑它会变得好看。

【问题讨论】:

  • 你考虑过 flow.throttle 吗?
  • 是的,但是因为我有一个Seq[Request],所以我需要在这个Seq 中的每个请求之间等待。所以,我也需要一个圆顶式的内部油门
  • flatMapConcat(seq => Source(seq).throttle(…).grouped(seq.size))?
  • @ViktorKlang 据我了解,这将限制每个组的流量,而不是组之间组本身,就像每个seq我们创建一个新的独立throttle
  • 你的测试显示了什么?

标签: scala akka akka-stream akka-http


【解决方案1】:

正如维克多所说,您可能应该使用默认油门。但如果你想自己做,它可能看起来像这样

private def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val ticker = Source.tick(rate, rate, Unit)

  val zip = builder.add(Zip[T, Unit.type])
  val map = Flow[(T, Unit.type)].map { case (value, _) => value }
  val messageExtractor = builder.add(map)

  ticker ~> zip.in1
  zip.out ~> messageExtractor.in

  FlowShape.of(zip.in0, messageExtractor.out)
})

// And it will be used in your flow as follows
// .via(throttleFlow(FiniteDuration(200, MILLISECONDS)))

此外,由于您要限制对某些 API 的访问,您可能希望以集中方式限制对它的调用。假设您的项目中有多个地方调用相同的外部 API,但是因为来自相同 IP 的调用应该应用于所有地方。对于这种情况,请考虑将MergeHub.source 用于您的(假定的)akka-http 流。每个调用者都会创建并执行新的图表来进行调用。

【讨论】:

  • 我面临的问题是这会限制Seq[Request]请求的流量,但速率限制适用于每个Request。 IE。即使将 Seq[Request] 流量限制为每秒 1 个,我也会更频繁地触发请求。我考虑过压平流,但很难再次聚合它们,而且感觉不对。
【解决方案2】:

这是我最终使用的:

  case class FlowItem[I](i: I, requests: Seq[HttpRequest], responses: Seq[String]) {
    def withResponse(resp: String) = copy(responses = resp +: responses)
    def extractNextRequest = (requests.head, copy(requests = requests.tail))
  }


 def apiFlow[I, O](requestPer: FiniteDuration,
                    buildRequests: I => Seq[HttpRequest],
                    buildOut: (I, Seq[String]) => O
                   )(implicit system: ActorSystem, materializer: ActorMaterializer) = {
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val in: FlowShape[I, FlowItem[I]] =
        b.add(Flow[I].map(i => FlowItem(i, buildRequests(i), Seq.empty)))

      val merge: MergePreferredShape[FlowItem[I]] =
        b.add(MergePreferred[FlowItem[I]](1))

      val throttle: FlowShape[FlowItem[I], FlowItem[I]] =
        b.add(Flow[FlowItem[I]].throttle(1, requestPer, 1, ThrottleMode.shaping))

      val prepareRequest: FlowShape[FlowItem[I], (HttpRequest, FlowItem[I])] =
        b.add(Flow[FlowItem[I]].map(_.extractNextRequest))

      val log =
        b.add(Flow[(HttpRequest, FlowItem[I])].map { r => Console.println(s"rquest to ${r._1.uri}"); r})

      val pool: FlowShape[(HttpRequest, FlowItem[I]), (Try[HttpResponse], FlowItem[I])] =
        b.add(Http(system).superPool[FlowItem[I]]())

      val transformResponse: FlowShape[(Try[HttpResponse], FlowItem[I]), FlowItem[I]] =
        b.add(Flow[(Try[HttpResponse], FlowItem[I])].mapAsync(1) {
          case (Success(HttpResponse(StatusCodes.OK, headers, entity, _)), flowItem) =>
            entity.toStrict(1.second).map(resp => flowItem.withResponse(resp.data.utf8String))
        })

      val split: UniformFanOutShape[FlowItem[I], FlowItem[I]] =
        b.add(Partition[FlowItem[I]](2, fi => if (fi.requests.isEmpty) 0 else 1))


      val out: FlowShape[FlowItem[I], O] =
        b.add(Flow[FlowItem[I]].map(fi => buildOut(fi.i, fi.responses)))

        in ~> merge ~> throttle ~> prepareRequest ~> log ~> pool ~> transformResponse ~> split ~> out
              merge.preferred   <~                                                       split

      FlowShape(in.in, out.out)
    }
  }

这个想法是传递元素的次数与请求的次数一样多,并将附加的(尚未执行的)请求与消息一起存储。 split 元素检查是否还有更多请求。

【讨论】:

    猜你喜欢
    • 2013-12-13
    • 1970-01-01
    • 1970-01-01
    • 2018-07-08
    • 2018-05-04
    • 2020-08-02
    • 2019-12-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多