【问题标题】:what does Mono.defer() do?Mono.defer() 做什么?
【发布时间】:2019-09-21 03:50:00
【问题描述】:

我在一些 Spring webflux 代码中遇到过 Mono.defer()

我在文档中查找了方法,但不明白解释:

"创建一个 Mono 提供者,它将提供一个目标 Mono 来订阅 为下游的每个订阅者”

请给我一个解释和一个例子。有没有我可以参考的一堆 Reactor 示例代码(他们的单元测试?)。

谢谢

【问题讨论】:

  • 你检查过javadoc吗? Flux/Monos 上的大多数方法都包含一个图表,说明它如何与反应流一起工作。
  • 是的,上面的引用来自 javadoc。我个人觉得这种语言很难理解;一些琐碎的代码示例会对我有所帮助。我正在克隆 reactor-core 项目并查看单元测试,看看是否有帮助。
  • 示例代码、链接和注释的组合,RxJava 的 Observable 应该在阅读其示例时替代 Mono 或 Flux 对我有帮助。
  • reactor 代码很难阅读和预测,至少在最初,这些图表也是我需要教程的另一种语言 :)

标签: java spring-webflux project-reactor


【解决方案1】:

这有点过于简单了,但从概念上讲,Reactor 源要么是懒惰的,要么是急切的。更高级的请求,如 HTTP 请求,预计会被延迟评估。另一方面,像Mono.justFlux.fromIterable 这样的最简单的人都渴望。

我的意思是,调用Mono.just(System.currentTimeMillis()) 将立即调用currentTimeMillis() 方法并捕获结果。所述结果仅在订阅后由Mono 发出。多次订阅也不会改变值:

Mono<Long> clock = Mono.just(System.currentTimeMillis());
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //we use block for demonstration purposes, returns t0

Thread.sleep(7_000);
//time == t17
clock.block(); //we re-subscribe to clock, still returns t0

defer 操作符让这个源变得懒惰,每次有新订阅者时重新评估 lambda 的内容

Mono<Long> clock = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //invoked currentTimeMillis() here and returns t10

Thread.sleep(7_000);
//time == t17
clock.block(); //invoke currentTimeMillis() once again here and returns t17

【讨论】:

  • 当我理解这个答案时,我从不喜欢这种解释,因为它有点暗示Mono.just(System.currentTimeMillis()) 的库选择将在调用时立即执行。但这就是 java 的工作方式,没有人会想到方法调用会以某种方式被库删除和延迟。
  • 是的,Java 就是这样工作的,没有包装在 lambda 中的方法调用不能神奇地变得懒惰。我不打算让它像图书馆正在做出“选择”
  • @SimonBaslé 我编写了返回 Flux 的 Netty HTTP 客户端(为什么我不能使用 Reactor Netty 是另一个话题)。在有人订阅之前,我不希望发生任何排放,并且正在使用defer,如下所示。这是正确的用法吗? publisher = UnicastProcessor.create&lt;ByteBuf&gt;(); chunks = Flux.defer { publisher }return chunks 给来电者并在publisher 上发布
  • 看起来您的库本身保证了 1 个订阅者(否则您会看到单播处理器出现错误)。我认为包装在延迟中没有任何价值,unicastProcessor 已经实现了 Flux。还有 Flux.create 你应该看看
【解决方案2】:

用简单的话 如果您在第一个视图中看到它就像 Mono.just() 但不是。 当你运行 Mono.just() 时,它会立即创建一个 Observable(Mono) 并重用它,但是当你使用 defer 时,它不会立即创建它,它会在每个订阅中创建一个新的 Observable。

一个用例看差异

    int a = 5;
@Override
public void run(String... args) throws Exception {

    Mono<Integer> monoJust = Mono.just(a);
    Mono<Integer> monoDefer = Mono.defer(() -> Mono.just(a));

    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));

    a = 7;
    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));
}

打印:

5
5
5
7

如果你看到 mono.just 立即创建了 observable,即使值发生了变化,它也不会改变,但是 defer 在 subscribe 中创建 observable,所以你将使用当前的 onSubscribe 值

【讨论】:

  • 感谢@RichardKollcaku,这样单声道就有了其底层状态的快照,而 defer 是说,每次调用 subscribe 时获取当前状态
  • 您的解释并不完全正确。没有诸如“创建新的 Observable”之类的东西。 just 和 defer 创建一个 Observable。 Mono.just 返回每个订阅的值,而 Mono.defer 评估 Supplier 函数。
  • 我说的不是 Observable 作为对象,而是 Observable 作为逻辑。 Mono 是可观察的。 Mono.just create new Observable 这意味着创建新的 Mono @htn
  • @jrender 在Mono.just() 的情况下它不能真正应用于状态快照。在这个例子中,他使用原始类型,所以结果不会改变,但是如果你使用一个对象(例如列表)并在 2 个订阅之间改变它(添加一个新值),你将有 2 个不同的结果,但它仍然同一个对象。
  • @RicardKollcaku 别担心,我很清楚它是如何工作的 defer operator。 Spring Reactor 不存在术语Observable,因此 Observable = Mono |助焊剂。
【解决方案3】:

我正在尝试 defer 用于不同的用例。编写以下代码来检查和共享,因为它可能对其他人有所帮助。我的用例是链接两个Monos,并确保第一个在第二个被占用之前完成。第二个包含一个阻塞调用,其结果用于响应Monoemptyerror 响应。如果没有defer,无论第一个结果如何,都会执行我的阻塞调用。但是在使用defer 时,阻塞调用仅在第一个Mono 完成时执行。代码如下:

public static void main(String[] args) {
    long cur = System.currentTimeMillis();
    boolean succeed = true;

    Mono<Integer> monoJust = Mono.create(consumer -> {
        System.out.println("MonoJust inside " + (System.currentTimeMillis() - cur));
        if (succeed) {
            consumer.success(1);
        } else {
            consumer.error(new RuntimeException("aaa"));
        }
    });

    Mono<String> monoJustStr = Mono.create(consumer -> {
        System.out.println("MonoJustStr inside: " + (System.currentTimeMillis() - cur));
        consumer.success("one");
    });

    System.out.println("##1##: Begin");
    monoJust.then(evaluator() ? Mono.empty() : monoJustStr).subscribe(d -> System.out.println("##1##: "+d), e-> System.err.println(e));
    System.out.println("##1##: Done: "+(System.currentTimeMillis() - cur));

    System.out.println("\n\n\n##2##: Begin");
    monoJust.then(Mono.defer(() -> evaluator() ? Mono.empty() : monoJustStr)).subscribe(d -> System.out.println("##2##: "+d), e-> System.err.println(e));
    System.out.println("##2##: Done: " + (System.currentTimeMillis() - cur));

}

private static boolean evaluator() {
    System.out.println("Inside Evaluator");
    return false;
}

succeed=true 的输出 - 观察“Inside Evaluator”和“MonoJust inside”的顺序

##1##: Begin
Inside Evaluator
MonoJust inside 540
MonoJustStr inside: 542
##1##: one
##1##: Done: 542



##2##: Begin
MonoJust inside 544
Inside Evaluator
MonoJustStr inside: 544
##2##: one
##2##: Done: 544

下面是succeed = false 的输出 - 请注意未调用评估器。

##1##: Begin
Inside Evaluator
MonoJust inside 565
java.lang.RuntimeException: aaa
##1##: Done: 567



##2##: Begin
MonoJust inside 569
java.lang.RuntimeException: aaa
##2##: Done: 569

【讨论】:

    【解决方案4】:

    初学者的简单答案:

    当对 monoJust 变量调用 subscribe 时,它​​将打印三次随机整数。但是在 monoDefer 变量上调用 subscribe 时,它​​每次都可以打印一个随机数。

       Mono<Integer> justMono = Mono.just((new Random()).nextInt(10));
    
        //this will print same random number thrice
        for(int i=0;i<3;i++)
            justMono.subscribe(x -> {System.out.println("Just Mono: " + x);});
    
        Mono<Integer> deferMono = Mono.defer(() -> Mono.just((new Random()).nextInt(10)));
    
        //this might print three different random numbers
        for(int i=0;i<3;i++)
            deferMono.subscribe(x -> {System.out.println("Defer Mono: " + x);});
    

    在 Mono.just() 中,实例化仅在第一次订阅发生时发生一次。在 Mono.defer() 中,每次调用订阅时都会发生实例化。

    如需更多参考,请查看: https://www.youtube.com/watch?v=eupNfdKMFL4&t=381s 3:15 分钟

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-09-30
      • 2014-04-29
      • 1970-01-01
      • 1970-01-01
      • 2016-11-20
      • 1970-01-01
      • 2011-08-23
      • 2011-01-03
      相关资源
      最近更新 更多