【问题标题】:How does backpressure work in Project Reactor?背压在 Project Reactor 中是如何工作的?
【发布时间】:2019-12-09 06:55:03
【问题描述】:

我一直在 Spring Reactor 中工作,并且之前的一些测试让我想知道 Fluxes 在默认情况下如何处理背压。我知道存在 onBackpressureBuffer 等,我也读过RxJava defaults to unbounded until you define whether to buffer, drop, etc.

那么,谁能帮我澄清一下:Reactor 3 中 Flux 的默认背压行为是什么?

我尝试搜索答案,但没有找到任何明确的答案,只有 Backpressure 的定义或上面链接的 RxJava 答案

【问题讨论】:

    标签: spring rx-java rx-java2 project-reactor reactor


    【解决方案1】:

    什么是背压?

    背压或消费者向生产者发出信号的能力 排放率太高 - Reactor Reference

    当我们谈论背压时,我们必须将来源/发布者分为两组:尊重订阅者需求的一组,以及忽略它的一组。

    通常热门来源不尊重订阅者的需求,因为它们经常生成实时数据,例如收听 Twitter 提要。在此示例中,订阅者无法控制创建推文的速度,因此很容易被淹没。

    另一方面,冷源通常会在订阅发生时按需生成数据,例如发出 HTTP 请求然后处理响应。在这种情况下,您调用的 HTTP 服务器只会在您发送请求后发送响应。

    重要的是要注意这不是一条规则:不是每个热源都忽略需求,也不是每个冷源都尊重它。您可以阅读更多关于热源和冷源的信息here

    让我们看一些可能有助于理解的例子。

    尊重需求的发布者

    给定一个产生从 1 到 Integer.MAX_VALUE 的数字的 Flux,并给出一个处理单个元素需要 100 毫秒的处理步骤:

    Flux.range(1, Integer.MAX_VALUE)
        .log()
        .concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
        .blockLast();
    

    让我们看看日志:

    [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
    [ INFO] (main) | request(1)
    [ INFO] (main) | onNext(1)
    [ INFO] (main) | request(1)
    [ INFO] (main) | onNext(2)
    [ INFO] (parallel-1) | request(1)
    [ INFO] (parallel-1) | onNext(3)
    [ INFO] (parallel-2) | request(1)
    [ INFO] (parallel-2) | onNext(4)
    [ INFO] (parallel-3) | request(1)
    [ INFO] (parallel-3) | onNext(5)
    

    我们可以看到,在每个 onNext 之前都有一个请求。请求信号由concatMap 操作员发送。当concatMap 完成当前元素并准备好接受下一个元素时,会发出信号。源只有在收到下游的请求时才发送下一项。

    在这个例子中,背压是自动的,我们不需要定义任何策略,因为操作员知道它可以处理什么并且源尊重它。

    定义了忽略需求且没有背压策略的发布者

    为了简单起见,我为本示例选择了一个易于理解的冷发布者。它是Flux.interval,它在每个指定的时间间隔发出一个项目。这个冷漠的发布者不尊重需求是有道理的,因为看到项目以不同的、比最初指定的更长的间隔发出会很奇怪。

    让我们看看代码:

    Flux.interval(Duration.ofMillis(1))
        .log()
        .concatMap(x -> Mono.delay(Duration.ofMillis(100)))
        .blockLast();
    

    Source 每毫秒发出一个项目。订阅者能够每 100 毫秒处理一项。很明显,订阅者无法跟上制作者的步伐,我们很快就会收到类似这样的异常:

    reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
        at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
        ...
    

    我们可以做些什么来避免这个异常?

    定义了忽略需求和背压策略的发布者

    默认的背压策略是我们在上面看到的:以错误终止。 Reactor 不会对我们强制执行任何错误处理策略。当我们看到这种错误时,我们可以决定哪一个最适合我们的用例。

    你可以在Reactor reference找到其中的几个。

    对于这个例子,我们将使用最简单的一个:onBackpressureDrop

    Flux.interval(Duration.ofMillis(1))
        .onBackpressureDrop()
        .concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
        .doOnNext(a -> System.out.println("Element kept by consumer: " + a))
        .blockLast();
    

    输出:

    Element kept by consumer: 0
    Element kept by consumer: 1
    Element kept by consumer: 2
    Element kept by consumer: 3
    Element kept by consumer: 4
    Element kept by consumer: 5
    Element kept by consumer: 6
    Element kept by consumer: 7
    Element kept by consumer: 8
    Element kept by consumer: 9
    Element kept by consumer: 10
    Element kept by consumer: 11
    Element kept by consumer: 12
    Element kept by consumer: 13
    Element kept by consumer: 14
    Element kept by consumer: 15
    Element kept by consumer: 16
    Element kept by consumer: 17
    Element kept by consumer: 18
    Element kept by consumer: 19
    Element kept by consumer: 20
    Element kept by consumer: 21
    Element kept by consumer: 22
    Element kept by consumer: 23
    Element kept by consumer: 24
    Element kept by consumer: 25
    Element kept by consumer: 26
    Element kept by consumer: 27
    Element kept by consumer: 28
    Element kept by consumer: 29
    Element kept by consumer: 30
    Element kept by consumer: 31
    Element kept by consumer: 2399
    Element kept by consumer: 2400
    Element kept by consumer: 2401
    Element kept by consumer: 2402
    Element kept by consumer: 2403
    Element kept by consumer: 2404
    Element kept by consumer: 2405
    Element kept by consumer: 2406
    Element kept by consumer: 2407
    

    我们可以看到在前 32 个项目之后有一个相当大的跳跃到 2400。由于定义的策略,它们之间的元素被丢弃了。

    关键要点

    • 背压通常是自动的,我们不需要做任何事情,因为我们按需获取数据。
    • 如果来源不尊重订阅者的需求,我们需要定义一个策略来避免终止错误。

    更新: 有用的阅读:How to control request rate

    【讨论】:

    • 首先,感谢您的回复!您突出显示了我找到的一些示例,并完美地回答了我的问题。除此之外,我想知道您是否有任何关于如何在 Reactive Streams 中处理静态数据的参考资料——热流和冷流的处理似乎与我使用 Reactor 的经验有很大不同,而且我只真正使用过“faux”-hot流是冷流或处理冷流的结果。
    • 为什么我的程序没有记录 print "[ INFO] (main) | request(1) "。也许是因为反应堆核心发生了一些变化?
    猜你喜欢
    • 2021-01-12
    • 1970-01-01
    • 1970-01-01
    • 2016-05-08
    • 2020-09-24
    • 1970-01-01
    • 2021-06-12
    • 2020-09-14
    • 2021-07-14
    相关资源
    最近更新 更多