【问题标题】:Why doesn't this map() or flatMap() finish the pipeline even with then()?为什么即使使用 then() 这个 map() 或 flatMap() 也不完成管道?
【发布时间】:2021-08-06 11:44:14
【问题描述】:

我有这个方法

private Mono<String> getToken(String id){
....
}

和调用者

List<String> tokens = new ArrayList<>();
getToken(id).map(s -> tokens.add(s)).then();

它不会将返回的token 添加到列表中 为什么这条管道永远不会完成,而返回的 token 永远不会填充到列表中?

我已经尝试过subscribe()flatMap(),但问题仍然存在。我设法将令牌填充到列表中的唯一方法是使用block(),这反过来又引发了另一个异常。

【问题讨论】:

    标签: java reactive-programming spring-webflux project-reactor


    【解决方案1】:

    嗯,你有点回答了你自己的问题:

    我设法将令牌填充到列表中的唯一方法是使用 block()

    您可能会将 Project Reactor API 与 JavaScript Promises 混淆。它的工作方式不同。
    我强烈建议阅读the documentation

    确实,执行反应式管道的唯一方法是订阅它subscribe() 会这样做,但它是异步的,因此它不会等待计算完成。
    block() 是您正在寻找的:它订阅您的 Mono 并等待它结束。

    其次,map() 将接收到的值转换为其他值;在你的情况下,在 map() 之后你会得到一个 Mono&lt;Boolean&gt; 因为 ArrayList#add() 返回一个布尔值。如果不需要改造,可以拨打doOnNext()

    List<String> tokens = new ArrayList<>();
    getToken(id).doOnNext(tokens::add).block();
    

    这将执行Mono 并将结果存储在List 中,然后再继续执行。

    【讨论】:

    • java.lang.IllegalStateException: block()/blockFirst()/blockLast() 是阻塞的,线程 reactor-http-nio-4 不支持
    • @MeladBasilius 这是另一个问题,你需要小心你阻塞的地方。你为什么在线程 reactor-http-nio-4 中?这意味着您已经处于反应式管道中……
    【解决方案2】:

    getToken() 方法中包含的逻辑未知,可能会延迟或调用需要时间的同步服务,测试时可能会有所不同。

    • 不要在代码中使用 reactor 进行阻塞调用,您可以通过 BlockHound 代理了解是否有阻塞调用

    • 很多时候您需要使用CountDownLatch 才能完成订阅。

    • 还使用简单的Thread.sleep() 使主线程休眠,以便终止订阅,但我更喜欢它上面的选项..


    reactor-test 依赖项

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
    

    带有反应器的基本 TDD

    private Mono<String> getToken(String id) {
        return Mono.just(id);
    }
    
    @Test
    @DisplayName("get token version 1")
    void testGetToken() {
        List<String> tokens = new ArrayList<>();
        final String ui = UUID.randomUUID().toString();
        getToken("Token id " + ui)
                .map((String token) -> {
                    tokens.add(token);
                    return token;
                })
                //.then() From Documentation, This operator discards the element from the source.
                // do not invoke then () here it is not necessary and by the way it returns a Mono <Void> you want a String token
                .subscribe(tokenSubcription -> log.info("Token id {}", tokenSubcription));
    }
    
    @Test
    @DisplayName("Get token version 2")
    void testGetToken2() {
        final String ui = UUID.randomUUID().toString();
        List<String> tokens = new ArrayList<>();
        Mono<String> monoToken = getToken("Token id" + ui)
                .map((String token) -> {
                    tokens.add(token);
                    return token;
                });
    
        StepVerifier.create(monoToken)
                .expectNextMatches(e -> e.equals("Token id" + ui))
                .verifyComplete();
    }
    
    @Test
    @DisplayName("Get token version 3 with CountDownLatch")
    void testGetToken3() {
        List<String> tokens = new ArrayList<>();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final String ui = UUID.randomUUID().toString();
    
        final AtomicReference<String> atomicReference = new AtomicReference<>();
        getToken("Token id " + ui)
                        .map((String token) -> {
                            tokens.add(token);
                            return token;
                        })
                        .doOnTerminate(countDownLatch::countDown)
                        .log()
                        .subscribe(atomicReference::set);
        try {
            countDownLatch.await();
            log.info("Thread in doOnTerminate {}", Thread.currentThread().getName());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    
        assertThat(atomicReference.get()).isEqualTo("Token id ".concat(ui));
    
    }
    

    另一个没有 StepVerifier 的例子

    private Mono<List<String>> getMonoToken() {
            List<String> tokens = new ArrayList<>();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final String ui = UUID.randomUUID().toString();
    
            getToken("Token id " + ui)
                    .doOnTerminate(countDownLatch::countDown)
                    .subscribe(tokens::add);
    
            try {
                countDownLatch.await();
                log.info("Thread in doOnTerminate {}", Thread.currentThread().getName());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return Mono.just(tokens);
    }
    
    @RepeatedTest(10)
    void testMonoToken2() {
            this.getMonoToken()
                    .subscribe(token -> log.info("Token id {}", token));
    }
    

    使用订阅

    private Mono<List<String>> getMonoToken() {
           List<String> tokens = new ArrayList<>();
           final String ui = UUID.randomUUID().toString();
           return getToken("Token id " + getUUI())
                   .map(token -> {
                       tokens.add(token);
                       return tokens;
                   });
    }
    
    @RepeatedTest(10)
    void testMonoToken2() {
           this.getMonoToken()
                   .subscribe(token -> log.info("Token id {}", token));
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-12
      • 2019-12-20
      • 2015-11-30
      • 2012-05-31
      • 1970-01-01
      相关资源
      最近更新 更多