【问题标题】:How to select max item per group from Flux如何从 Flux 中选择每个组的最大项目
【发布时间】:2021-08-18 22:38:40
【问题描述】:

鉴于以下MyObjectFlux<MyObject>,从该助焊剂中删除具有相同属性的MyObjects 的最佳方法是什么?

    import lombok.Data;
    import reactor.core.publisher.Flux;
    
    public class Example {
    
        @Data
        public class MyObject {
            final String name;
            final int priority;
        }
    
        public Example() {
            Flux<MyObject> myFlux = Flux.just(
                    new MyObject("abc", 2),
                    new MyObject("abc", 4),
                    new MyObject("cde", 1));
        }
    }

例如,我想删除具有相同name 的对象,同时选择具有更高priority 的对象。
输出:[Example.MyObject(name=abc, priority=4), Example.MyObject(name=cde, priority=1)]

如果我使用myFlux.distinct(MyObject::getName),我将无法选择保留哪一个。

【问题讨论】:

    标签: java reactive-programming spring-webflux project-reactor


    【解决方案1】:

    要解决这个问题,您首先需要将此Flux&lt;MyObject&gt; 转换为Mono&lt;List&lt;MyObject&gt;&gt;,因为您需要知道所有对象及其优先级才能对它们进行排序。

    一旦你有了MyObject的所有实例的列表,你就可以使用Java 8 Stream api来解决这个问题:

    @Slf4j
    public class Example {
    
        public static void main(String[] args) {
            Flux<MyObject> myFlux = Flux.just(
                            new MyObject("abc", 2),
                            new MyObject("abc", 4),
                            new MyObject("cde", 1))
                    .collectList()
                    .map(myObjectsList -> myObjectsList.stream()
                            .collect(Collectors
                                    .groupingBy(MyObject::getName)))
                    // now we have a Map<String, List<MyObject>>
                    .map(Map::entrySet)
                    // now we have a Set<Entry<String, List<MyObject>>>
                    .flatMapIterable(entrySet -> entrySet)
                    .map(Map.Entry::getValue)
                    // now we have a Flux<List<MyObject>>
                    // and all MyObject in that list have 
                    // the same name
                    .filter(allObjectsWithSameName -> !allObjectsWithSameName.isEmpty())
                    // now we sort all the lists in descending order
                    // and return the first element
                    // which is the one with the highest prio
                    .map(allObjectsWithSameName -> {
                                allObjectsWithSameName.sort(new Comparator<MyObject>() {
                                    @Override
                                    public int compare(MyObject o1, MyObject o2) {
                                        return Integer.compare(o2.priority, o1.priority);
                                    }
                                });
                                return allObjectsWithSameName.get(0);
                            }
                    );
    
            myFlux.subscribe(result -> System.out.println("MyObject: " + result.toString()));
        }
    
        @Data
        @RequiredArgsConstructor
        public static class MyObject {
            final String name;
            final int priority;
        }
    }
    
    

    输出:

    MyObject: Example.MyObject(name=abc, priority=4)
    MyObject: Example.MyObject(name=cde, priority=1)
    

    【讨论】:

      【解决方案2】:

      您可以在Flux 上使用groupByreduce 运算符实现此目的:

      Flux.just(
              new MyObject("abc", 2),
              new MyObject("abc", 4),
              new MyObject("cde", 1))
          .groupBy(MyObject::getName)
          .flatMap(group -> group.reduce((o1, o2) -> o1.getPriority() > o2.getPriority() ? o1 : o2))
          .subscribe(System.out::println);
      

      一个重要的考虑是,这仅在组数较少的情况下才有效,否则可能导致死锁。作为补救措施,您可以将flatMapmaxConcurrency 参数设置为更高的值。

      documentation of groupBy operator:

      groupBy 需要在下游排出和消耗这些组 正常工作。尤其是当标准产生大量 团体,如果团体没有得到适当的消费,它可能会导致挂起 下游(例如,由于带有 maxConcurrency 参数的 flatMap 设置得太低)。

      【讨论】:

        猜你喜欢
        • 2013-06-24
        • 1970-01-01
        • 2011-11-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-10-14
        • 2015-12-09
        • 1970-01-01
        相关资源
        最近更新 更多