【问题标题】:How to design/create reactive method with Project Reactor如何使用 Project Reactor 设计/创建反应方法
【发布时间】:2019-11-11 16:14:46
【问题描述】:

我刚开始使用基于项目反应器的响应式框架,例如 spring-webflux,对此我有一些疑问。

问题 1:

示例 1:

  public Mono<String> doSome(String str){
    String key = str.substring(0, 5).toLowerCase() + str.substring(6);
    return getValueFromRedis(key);
  }

  public Mono<String> getValueFromRedis(String key){
    return reactiveRedisOperations().opsForHash().get("TestCache", key);
  }

示例 2:

  public Mono<String> doSome(String str){
    return Mono.fromCallable(() -> {
      String key = str.substring(0, 5).toLowerCase() + str.substring(6);
      return getValueFromRedis(key);
    }).flatMap(stringMono -> stringMono);
  }

  public Mono<String> getValueFromRedis(String key){
    return reactiveRedisOperations().opsForHash().get("TestCache", key);
  }

两个例子之间有区别还是两者都可以接受。

问题 2:

示例 1:

  @PostMapping(value = "/greet")
  public Mono<String> greet(String name) {
    return Mono.fromCallable(() -> aMethod(name));
    // or return Mono.just(aMethod(name));
  }

  public String aMethod(String name){
    return "Hello: " + name;
  }

示例 2:

  @PostMapping(value = "/greet")
  public Mono<String> greet(String name) {
    return aMethod(name);
  }

  public Mono<String> aMethod(String name){
    return Mono.just("Hello: " + name);
  }

我知道第二个问题很奇怪,但我想知道所有方法都应该返回 Mono 或 Flux 还是可以像 Question2/Example1 一样使用。

【问题讨论】:

  • 删除 .flatMap(stringMono -&gt; stringMono) - 这没有任何意义。

标签: java spring-boot project-reactor


【解决方案1】:

问题1:是的,有区别。在示例 1 中,您在 Mono.fromCallable 之外创建 String key。这没什么大不了的,但如果是昂贵的操作,你会减慢一切。

这个逻辑reactiveRedisOperations().opsForHash() 也是在Mono.fromCallable 之外执行的。同样的事情 - 如果这很昂贵,你就会减慢一切。

问题 2:与问题 1 相同。Mono.just 接受常规对象,而不是稍后会在需要时调用的对象(例如 CallableSupplier)。因此,当使用Mono.just 时,您将立即为参数初始化付出代价。

在您的示例中几乎没有任何区别,但 MonoFlux 通常用于以异步方式链接昂贵的操作,因此不会阻塞任何内容,例如数据库调用或对其他外部服务的调用。请看下面我的示例,其中 sleep 是对外部调用的模拟。注意打印语句的顺序。

public String superLongMethod() {
  System.out.println("superLongMethod");
  Thread.sleep(10000);
  return "ok";
}

System.out.println("before");
Mono.just(superLongMethod());
System.out.println("after");

// printed output is - before - superLongMethod - after

-----------------------------------------------------------------

System.out.println("before");
Mono.fromCallable(() -> superLongMethod());
System.out.println("after");

// printed output is - before - after - superLongMethod

-----------------------------------------------------------------

System.out.println("before");
String key = superLongMethod();
System.out.println("after");
return getValueFromRedis(key);

// printed output is - before - superLongMethod - after

-----------------------------------------------------------------

System.out.println("before");
Mono<String> mono = Mono.fromCallable(() -> {
  String key = superLongMethod();
  return getValueFromRedis(key);
}).flatMap(stringMono -> stringMono);
System.out.println("after");
return mono;

// printed output is - before - after - superLongMethod

【讨论】:

  • '这个逻辑reactiveRedisOperations().opsForHash()也是在Mono.fromCallable之外执行的。同样的事情——如果这很昂贵,你就会放慢一切。但是 ReactiveRedisOperations 是一个非阻塞操作(spring-data-redis-reactive)。它默认使用反应器。但我明白重点。感谢您的解释:)
  • 这就是为什么我说if :) 另外我认为只有get 是非阻塞的,之前的两种方法只是获取实际对象的一种方法。
  • 你会得到 before - after 输出而不是 before - after - superLongMethod,因为没有人订阅你的 Mono。
  • fromCallable 在订阅者订阅时在不同的线程中执行语句,而 Just 方法调用调用线程本身中的语句,并在初始化期间执行此操作,但在订阅时发出数据。我的理解正确吗?请纠正我
【解决方案2】:

fromCallable 在 Mono 有 一个新订阅者 时执行 lambda,而 just 在实例化时捕获它的参数并将其发送给它的每个订阅者。

以下代码说明了区别:

private Instant getTime() {
    final Instant now = Instant.now();
    System.out.println("getTime(): " + now);
    return now;
}

@Test
public void just() throws InterruptedException {
    final Mono<Instant> mono = Mono.just(getTime())
            .doOnNext(instant -> System.out.println(instant));

    Thread.sleep(500);

    mono.subscribe();

    Thread.sleep(500);

    mono.subscribe();

    /* output is
        getTime(): 2019-08-14T22:47:06.823Z
        2019-08-14T22:47:06.823Z
        2019-08-14T22:47:06.823Z
    */
}

@Test
public void fromCallable() throws InterruptedException {
    final Mono<Instant> mono = Mono.fromCallable(() -> getTime())
            .doOnNext(instant -> System.out.println(instant));

    Thread.sleep(500);

    mono.subscribe();

    Thread.sleep(500);

    mono.subscribe();

    /* output is
        getTime(): 2019-08-14T22:47:13.947Z
        2019-08-14T22:47:13.947Z
        getTime(): 2019-08-14T22:47:14.447Z
        2019-08-14T22:47:14.447Z
    */
}

(你也可以看看RxJava Single.just() vs Single.fromCallable()?,它是关于 RxJava 但它的 API 几乎与 Reactor 相同)。

所以你的问题 1 的答案是:是的,有区别。 在示例 1 中,doSome 的调用将立即导致从 Redis 中获取异步值。在示例 2 中,在有人订阅 doSome 方法返回的 Mono 之前,不会涉及 Redis。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-09-27
    • 2018-11-18
    • 2017-06-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-07
    相关资源
    最近更新 更多