【问题标题】:Issue with combining webclient with Mono and Flux将 webclient 与 Mono 和 Flux 结合使用的问题
【发布时间】:2019-09-21 20:08:10
【问题描述】:

嗨,我有 Flux 并且在迭代每个元素的过程中它会创建新的 mono 。我还有 Flux 之外的其他单声道。并想做以下事情:当通量(具有相应的内部单声道结束)然后做第二个单声道。具有挑战性的部分是 mono 内部的通量从 webclient 请求创建。作为起点,请查看“加载”方法。 基本上没有 webclient 它可以工作,但如果 webclient 在地图内工作,那么之后。 使用spring-boot 2

public WebClient.ResponseSpec sendGetRequest(String path, Object... pathVariables){
   try {
       LOGGER.info("content type {}, url {}, path {}", contentType, url, path);
       WebClient.ResponseSpec responseSpec = sendRequest(HttpMethod.GET, contentType, authorizationToken, url, path, pathVariables);
       return responseSpec;
   }catch (Exception e){
       throw new WebClientProcessingException("Exception when trying to process", e);
   }
}

public Mono<PersonPayload> loadPerson(String  path){
    try {
        LOGGER.info("path {}", path);
        Mono<QuestionDetailsPayload> person = sendGetRequest(path).bodyToMono(PersonPayload.class);
        return person;
    }catch (Exception e){
        throw new WebClientProcessingException("Exception when trying to process",e);
    }
}


public Mono<PersonDomain> getPerson(String path) {
    Assert.notNull(path, "path can't be null");
    try{
        LOGGER.info("path {}" ,path);
        Mono<PersonPayload> personPayload = loadPerson(path);
        return personPayload.map(this::toPersonDomain);
    }catch (Exception e){
        throw new PersonNotFoundException("Exception when trying to get person info" , e);

    }
}

public PersponDomain toPersonDomain(PersonPayload personPayload){
    return modelMapper.map(personPayload, PersonDomain.class);
}

public void load(){
    List<String> outStr = Arrays.asList("out1", "out2","out3");
    Flux flux = Flux.fromIterable(outStr);
    Flux<Mono<PersonDomain>> results =  flux.map(string ->{
        System.out.println(string);
        Mono<PersonDomain> personMono = getPerson("inside");
        Mono<String> result = personMono.map(h ->{
            System.out.println(personMono.getName());
            return personMono.getName() + "_test";
        });
        return result;
    });
    Mono<String> second = Mono.just("second");
    results.then(second);
    results.subscribe(stringMono -> {
        stringMono.subscribe();
    });
    second.subscribe( s->{
        System.out.println(s);
    });
}

Gradle 依赖:

implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-validation'

implementation 'org.postgresql:postgresql'
implementation 'org.springframework.boot:spring-boot-starter-jooq'
implementation 'org.jooq:jooq-codegen'

implementation 'org.modelmapper:modelmapper:2.3.0'
implementation 'org.modelmapper:modelmapper:2.3.0'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

implementation 'com.google.code.gson:gson:2.8.5'

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.powermock:powermock-module-junit4:2.0.0'
testImplementation 'org.powermock:powermock-api-mockito2:2.0.0'}

【问题讨论】:

  • 您到底想做什么?将Mono 的响应与发出的每个Flux 元素结合起来?我可以看到根据您的代码 sn-p 返回类型的第一个 Flux 应该是 Flux&lt;Mono&lt;String&gt;&gt; 而不是 Flux&lt;Mono&lt;PersonDomain&gt;&gt;。此外,在使用 Webclient 时,您应该使用 flatMap。

标签: java spring-boot spring-webflux project-reactor reactor-netty


【解决方案1】:

Flux#map是同步操作,不订阅返回的对象。

您应该使用Flux#flatMap/Flux#concatMap/Flux#flatMapSequential/Flux#switchMap。这些运营商将订阅返回的Publisher

【讨论】:

  • 同意地图同步,但是当我使用 Webclient 时,即使使用平面地图也存在我的问题
  • 即使使用 flatmap ,如果我在没有 Webclient 的情况下更改 getPerson ,它也会起作用。请查看我用来解决问题的方法。我只是不明白为什么没有等待flux完全完成但订阅了。
【解决方案2】:

我的情况的解决方案 使用:

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 

而不是那时。 它处理值和错误,但在序列成功完成时也会执行一些代码。

【讨论】:

    猜你喜欢
    • 2020-05-04
    • 1970-01-01
    • 1970-01-01
    • 2021-09-30
    • 2019-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多