【问题标题】:What is the real world case for Mono.defer()?Mono.defer() 的真实案例是什么?
【发布时间】:2021-07-14 13:33:54
【问题描述】:

我知道Mono.defer() 做了什么,但我应该什么时候使用它?我知道其中一个用例是在返回 Mono 的函数中推迟一些阻塞副作用,但这通常是一种不好的做法(将副作用放在返回 MonoFlux 的函数中)。当我想在Mono 中包含一些阻塞代码时,有Mono.fromCallable()。那么使用Mono.defer()的最佳案例是什么?

【问题讨论】:

标签: java reactive-programming reactor reactive-streams


【解决方案1】:

Mono.fromCallable() 用于定期延迟服务调用。 Mono.defer() 会生成Mono,并且只有在您订阅 时才会这样做。因此,您可以决定创建这个或那个Mono,而不是在合成阶段,而是在目标订阅上。

【讨论】:

  • 我明白,但我为什么需要它?用例是什么?
  • 当您需要创建 Mono 时,不是像使用 Mono.just() 那样立即创建,而是稍后,仅当订阅发生并且结果可能取决于该订阅者上下文时。例如,您需要传播一个真正依赖于经过身份验证的订阅者的安全上下文。
【解决方案2】:

@a.khakh

帮助我的一个非常简单的用例是在另一个线程中执行阻塞代码,即处理该阻塞调用,以便主线程不会阻塞我。

我看到需要使用 defer 来订阅另一个线程,在本例中是在项目反应器提供并由 Simon Basle 编程的调度程序中。

如果您将defer 加上subscribeOnSchedulers.boundedElastic 组合使用,您将使您的代码非阻塞,或者它不会阻塞您的应用程序,从而消除主程序中的争用-线程。


下图是Combo的第三个组件,内部使用了一个defer加一个Scheduler,以免阻塞应用。

  • 忽略名为ui.access() 的方法,它是vaadin 框架的一部分。
Mono.defer(()  -> this.reactiveRandomNumbers.monoFrecuency(event.getValue().getSize())) <1>
                        .doOnEach(signal -> log.info("Thread name doOnNext(): {}", Thread.currentThread().getName()))
                        .subscribeOn(Schedulers.boundedElastic())
                        .subscribe(subscribeMap -> {
                            ui.access(() -> {
                                log.info("Thread name subscribe(): {}", Thread.currentThread().getName());
                                this.execute(event.getValue().getSize(), e -> subscribeMap);
                            });
                        });

【讨论】:

  • 那么,在您的情况下,阻塞代码是什么? SECURE_RANDOM.ints?
  • 是的,它在主线程中执行,如果您执行 Blockhound,您可以看到在内部进行了阻塞调用并且它会检测到它。有我上面给你的方法,你可以去掉Mono.defer调用它,你的调用会被阻塞,它需要defersubscribeOn
  • 这与使用Mono.fromCallable() 的更简单的方法有何不同? Mono.fromCallable(() -&gt; SECURE_RANDOM.ints(...)).doOnEach(...).subscribeOn(...).subscribe(...)
  • 使用 fromCallable 也是有效的,我可以在另一个调度程序中处理它。使用 defer 他接收发布者作为参数,您可以通过查看行为来测试上面的应用程序,只需进行简单的更改。
  • 我会说,对装配时间进行昂贵的计算是一种反模式,所以在这里看不到 defer() 的任何好处,我们又回到了原点 :)
【解决方案3】:

有时你需要懒惰地计算一些同步的东西。

例如,我们有一个异步服务

public interface SomeService {

    Mono<User> getUserByName(String name);

    Mono<CreateUserResult> createUser(User user);

}

和一个端点,它在此服务中查找用户并在未找到该用户时创建。但在创建之前,必须在特殊线程池中同步执行一些繁重的计算。所以

someService.getUserByName(name)
                .switchIfEmpty(Mono.defer(() -> {
                    var someHeavyCalculatedData = localHeavyService.doSomethingNecessary(name);
                    return someService.createUser(new User(name, someHeavyCalculatedData));
                }).subscribeOn(specialScheduler));

当然,你可以说我们可以使用Mono.fromCallable 来代替

someService.getUserByName(name)
                .switchIfEmpty(Mono.fromCallable(() -> localHeavyService.doSomethingNecessary(name))
                        .subscribeOn(specialScheduler)
                        .flatMap(someHeavyCalculatedData -> someService.createUser(new User(name, someHeavyCalculatedData)))
                );

但是反应链中的每个方法调用都会导致在内存中创建大量对象。好吧,一次调用你,可能不会注意到任何事情,但是负载较高时频繁分配内存,然后由 GC 释放它会导致你的应用程序变慢。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-09-21
    • 2011-06-05
    • 2017-02-17
    • 2010-11-03
    • 2019-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多