【问题标题】:Combine multiple Monos into a flux将多个 Mono 组合成一个通量
【发布时间】:2019-05-25 19:56:42
【问题描述】:

我正在附加一个带有 flatMap 的 Flux,但如果我添加额外的 flatMap,则只会返回最后一个。

// Here is an example of the Mono function

private Mono<MyType> appendFirstMono(Group group) {
    return Mono.just(group)
    .map(MyType::new)
        .flatMap(g -> functionZ(group)
        .map(g::setField));
} 


//This works as expected

public Flux<MyType> function1() {

    return returnData(id)
            .thenMany(service.getData(id))
            .flatMap(this::appendFirstMono);
}

//This does not and only returns the last mono (3rd)

public Flux<MyType> function1() {

    return returnData(id)
            .thenMany(service.getData(id))
            .flatMap(this::appendFirstMono)
            .flatMap(this::appendSecondMono)
            .flatMap(this::appendThirdMono);
}

//I've attempted to fix with this... Doesn't work as expected.   

public Flux<MyType> function1() {

    return returnData(id)
            .thenMany(service.getData(id))
            .flatMap(x -> {
                return Flux.merge(
                    appendFirstMono(x),
                    appendSecondMono(x),
                    appendThirdMono(x)
                );
            });
}

我需要处理通量上的每个 Mono 函数,但我似乎无法让每个函数正确执行和返回。

【问题讨论】:

  • 你看到这个答案了吗stackoverflow.com/questions/42007841/…
  • 你指的是mergeSequential吗?那是行不通的。有序函数调用中的最后一个单声道是唯一返回其值的单声道。

标签: spring project-reactor


【解决方案1】:

你可以尝试 concat 一个一个地处理单声道,看看我的例子

        Flux.concat(getMono(0),getMono(1),getMono(2))
            .map(integer -> {
                System.out.println(integer);
                return integer;
            })
            .subscribe();

}

private Mono<Integer> getMono(Integer a) {
    return Mono.just(a)
            ;
}

这将打印 0,1,2

【讨论】:

  • 上面出现“类型不匹配:无法从 Disposable 转换为 Publisher extends MyType>”错误。
  • 您是否更改了我的代码,因为我发布了单词
  • 是的,我把它加到我这里了--------> public Flux function1() { return returnData(id) .thenMany(service.getData(id)) .flatMap (x -> { return Flux.concat( appendFirstMono(x), appendSecondMono(x), appendThirdMono(x) ).map(integer -> { System.out.println(integer); return integer; }) .subscribe() ; }); }
  • service.getdata 返回什么
  • 它返回一个 Flux 是一个 API 项,用于扩展 类,其中包含来自这些单声道的一些字段。供参考..
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-02-16
  • 2021-02-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-08
相关资源
最近更新 更多