【问题标题】:How does backpressure work in flatMap operator of Project Reactor?Project Reactor 的 flatMap 运算符中的背压如何工作?
【发布时间】:2021-01-12 07:22:57
【问题描述】:

项目反应堆上有背压的概念,这对开发人员来说是透明的。想了解它的实际工作原理。

让我们使用下面的代码块

fun consumeMethod(data: Flux<String>) {
  data
    .flatMap { slowHttpCall(it) }
    .subscribe()
}

我对执行流程的理解是否正确:

  1. 当我们调用 subscribe() 时,它会请求发布者发送所有数据。
  2. 上移至 flatMap,假设它将向发布者请求 32 个元素。
  3. 然后发布者将发送 32 个元素
  4. 再次向下移动到 flatMap,它将为 32 个元素调用 slowHttpCall(),而无需等到每个 http 调用完成。所以现在我们有 32 个正在进行的 http 调用
  5. 到此为止吧

此时,flatMap 会向发布者请求更多元素吗?还是会等到所有 32 个 http 调用完成后再请求更多?还是会等到 1 完成并请求 1?它会要求多少,为什么?

谢谢

【问题讨论】:

    标签: kotlin project-reactor reactive


    【解决方案1】:

    它不会等到所有正在进行的 HTTP 调用完成。当一些正在进行的项目完成时,它将请求新项目。

    flatMap 有一个overloaded version,它允许您定义一个concurrency 参数(默认为256),它限制了最多可以进行多少内部发布者。如果正在进行的发布者的数量少于定义的限制,那么flatMap 将向源发布者请求额外的项目。

    现在,请求率似乎不一致。大多数时候flatMap 会一一请求,有时会请求更多。也许 Reactor 开发人员可以对此提供更多见解。

    您可以使用 log 运算符检查您的确切用例的行为:

    Flux.range(1, 1000)
            .log()
            .flatMap { Mono.delay(Duration.ofMillis(Random.nextLong(1000, 3000))).thenReturn(it) }
            .log(null, Level.WARNING)
            .blockLast()
    

    更多关于这个答案的背压:https://stackoverflow.com/a/57298393/6051176

    【讨论】:

      猜你喜欢
      • 2019-12-09
      • 1970-01-01
      • 2020-09-24
      • 1970-01-01
      • 1970-01-01
      • 2019-12-18
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多