【问题标题】:Reduce Flux to form Mono减少通量以形成单声道
【发布时间】:2019-12-30 01:52:48
【问题描述】:

我对 Spring Reactive 编程有点陌生。我正在尝试从 I/O 获得通量,whi ch 返回一个对象列表以及从我的服务返回 Mono 的对象列表。

Flux<Obj1> -> Mono<Obj2>

Obj1
{
"a" : "123",
"combine" : "456"
"combine2" : "789"
}

Flux<Obj1> has multiple objects

Obj2
{
"a" : "123"
"combine" : {
              "456" : "1"
            },
"combine2" : {
               "789" : "2"
             }
}

Mono<Obj2> is a consolidation of flux with the Combiner keys.

为了实现这一点,我最初的方法是确保使用 then 和 after 操作数据完成 Flux。

Flux.just(obj1a,obj1b,obj1c)
    .then();

但是上面的语句返回了一个 void Mono,不确定 thenMany 在这种情况下如何工作。

我觉得这里漏掉了什么,完成后我应该如何控制 Flux 对象。

【问题讨论】:

  • 你需要的是 collect() 或 reduce()。检查 API 文档。
  • 我的错是减少。更新了问题。
  • @VihangShah 尝试发布可以轻松运行的代码示例。在这种情况下,伪代码并没有真正的帮助。

标签: spring spring-webflux project-reactor


【解决方案1】:

为了实现这一点,我最初的方法是确保使用 then 和 after 操作数据完成 Flux。

这是反应式编程的错误思维方式 - 您需要在数据流经通量时对其进行修改。 then() 方法将完全忽略来自通量的结果,并在完成时仅输出其他一些不相关的Mono

如果您想要获取某个元素的Flux,并希望将其缩减为某个其他元素的Mono,您很可能需要reduce() 方法。在您的情况下,这将采用初始 Obj2,然后是 BiFunction,其目的是采用中间 Obj2,在 Flux 中采用 Obj1,然后生成更新的 Obj2。然后reduce() 运算符将对整个流应用此缩减,最后为您提供Mono&lt;Obj2&gt;

从您的代码中并不能立即看出您想要具体实现什么,但以下是一个相关示例(为了简洁起见,使用 lombok):

@Data
@AllArgsConstructor
class Obj1 {

    private String a;
    private String combine;
    private String combine2;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class Obj2 {

    private String a;
    private Map<String, Integer> combine = new HashMap<>();
    private Map<String, Integer> combine2 = new HashMap<>();

}

public class NewClass {

    public static void main(String[] args) {
        Flux<Obj1> flux = Flux.just(
                new Obj1("123", "456", "789"),
                new Obj1("123", "456", "789"),
                new Obj1("123", "455", "789"));

        Mono<Obj2> mono = flux.reduce(new Obj2(), (o2, o1) -> {
            Map<String, Integer> combine = new HashMap<>(o2.getCombine());
            combine.put(o1.getCombine(), combine.getOrDefault(o1.getCombine(), 0) + 1);
            Map<String, Integer> combine2 = new HashMap<>(o2.getCombine2());
            combine2.put(o1.getCombine2(), combine2.getOrDefault(o1.getCombine2(), 0) + 1);
            return new Obj2(o1.getA(), combine, combine2);
        });

        mono.subscribe(System.out::println);
    }

}

【讨论】:

    猜你喜欢
    • 2021-03-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-22
    • 2017-06-20
    • 1970-01-01
    • 2021-05-15
    • 1970-01-01
    相关资源
    最近更新 更多