【问题标题】:根据列列表对数据进行分组和聚合
【发布时间】:2022-01-23 12:00:22
【问题描述】:

我有一个 JSONArray,其中包含多个 JSONObject,每个 JSON 对象代表一行数据。 (就像一个 SQL 行)

示例:

[{
    "col1": "c1",
    "col2": "r1",
    "col3": 12121
}, {
    "col1": "c1",
    "col2": "r1",
    "col3": 1321
}, {
    "col1": "c1",
    "col2": "r2",
    "col3": 4342
}, {
    "col1": "c1",
    "col2": "r2",
    "col3": 4532
}]

包含分组依据的列的列表:

示例:

["col1","col2"]

最后,必须应用的聚合,MINMAXSUMAVG 以及必须应用聚合的列:

预期输出:聚合为 SUM

[{
    "col1": "c1",
    "col2": "r1",
    "col3": 13442
},{
    "col1": "c1",
    "col2": "r2",
    "col3": 8874
}]

到目前为止我所尝试的:

每当我看到值发生变化时,我想将当前与以前的列列表进行比较,我对此进行聚合。但是这种方法看起来效率太低了。我正在考虑使用 Java Streams,但我很不擅长。任何帮助将不胜感激。

 if (agg.equalsIgnoreCase("MIN")) {
        Number min = data.getJSONObject(0).getNumber(column);
        for (int i = 0; i < data.length(); i++) {
            JSONObject jsonObject = data.getJSONObject(i);
            if (i > 1) {
            }
        }
    }

【问题讨论】:

    标签: java json java-stream


    【解决方案1】:

    根据您要处理的数据量,一个不依赖流的简单方法是使用Map。对聚合列值进行哈希处理以生成映射键,并根据聚合列的值更新映射值。

    在这里,我创建了一个Operation 接口,可以为每个操作(求和、最大值、最小值等)实现。

    例如

    interface Operation {
        Long update(Long currentAggregate, int nextValue);
    }
    
    class Sum implements Operation {
        @Override
        public Long update(Long currentAggregate, int nextValue) {
            return currentAggregate + nextValue;
        }
    }
    
    JSONArray aggregate(JSONArray array, String[] columns, String aggregateColumn, Operation op) {
        Map<String, Long> aggregates = new HashMap<>();
        for (int i = 0; i < array.size(); ++i) {
            JSONObject obj = array.getJsonObject(i);
            String key = getKey(obj, columns);
            Long current = aggregates.get(key);
            aggregates.put(key, op.update(current, obj.getInt(aggregateColumn)));
        }
        // Then split the map key back out to columns values (or use a more sophisticated 
        // object in place of 'aggregates' that also stores the column values explicitly) and 
        // return a JSONArray with values for the 'aggregateColumn' taken from 'aggregates'.
        // ...
    }
    
    String getKey(JSONObject obj, String[] columns) {
        // Assumes no column names include "_".
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < columns.length; ++i)
            builder.append(obj.getString(columns[i])).append("_");
        return builder.toString();
    }
    

    【讨论】:

    • 非常感谢您的指导。这让我很开心。
    【解决方案2】:

    您必须先解决问题,然后才能意识到这并不难,尤其是当有可以满足您需求的工具时,您甚至不需要自己实现某些东西。 Streams 在这里完全是一个不错的选择,因为 Java 8 Streams API 允许通过键对流式元素进行分组,并将这些组作为下游进行,例如聚合操作。

    假设您有一个 JSON 源生成一个巨大的数据集:对于您的示例,它仍然可以表示为 Stream&lt;JSONObject&gt;。我使用您的文件以流式方式读取它,从而生成可供分析的数据流(我敢打赌,我的拆分器实现并不完美,但它似乎有效):

    public static <T> Stream<T> asStream(final JSONTokener jsonTokener) {
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED) {
            private Status status = Status.BEFORE_ARRAY;
    
            @Override
            public boolean tryAdvance(final Consumer<? super T> action) {
                for ( ; ; ) {
                    switch ( status ) {
                    case BEFORE_ARRAY:
                        jsonTokener.next('[');
                        status = Status.IN_ARRAY;
                        continue;
                    case IN_ARRAY:
                        switch ( jsonTokener.nextClean() ) {
                        case ',':
                            return true;
                        case ']':
                            status = Status.AFTER_ARRAY;
                            return false;
                        default:
                            jsonTokener.back();
                            @SuppressWarnings("unchecked")
                            final T value = (T) jsonTokener.nextValue();
                            action.accept(value);
                            continue; // or return true?
                        }
                    case AFTER_ARRAY:
                        throw new IllegalStateException();
                    default:
                        throw new AssertionError(status);
                    }
                }
            }
        }, false);
    }
    
    private enum Status {
    
        BEFORE_ARRAY,
        IN_ARRAY,
        AFTER_ARRAY
    
    }
    

    它所做的只是将一些 JSON 令牌流转换为 something 流(因为 org.json 对象模型不建议使用通用基类)。 .如果您已经缓冲了JSONArray,则可以使用此处的内容进行流式传输:Convert Iterable to Stream using Java 8 JDK

    接下来,仅使用上面解析的流中的分组收集器:

    final Collector<JSONObject, ?, Map<List<String>, Double>> collector = Collectors.groupingBy(
            // your groups for (col1. col2)
            row -> List.of(row.getString("col1"), row.getString("col2")),
            // your aggregating SUM for col3
            Collectors.summingDouble(row -> row.getDouble("col3"))
    );
    Assertions.assertEquals(
            Map.of(List.of("c1", "r2"), 8874.0, List.of("c1", "r1"), 13442.0),
            JsonStreams.<JSONObject>asStream(new JSONTokener(reader))
                    .collect(collector)
    );
    

    SUM 就是这样。 AVG结果可以通过Collectors.averagingDouble来完成。

    【讨论】:

    • 我非常喜欢您的流解决方案,但它目前隐藏在相当切线的 JSONTokener -> Stream 转换器代码下。 OP 说他们已经有一个JSONArray,所以也许就从那里开始?
    • @polo-language 这并不重要,因为 OP 可能从一个非常大的 JSON 文件构建了 JSONArray(实际上并没有提到)+ 通常这样的东西永远不会是硬编码的对象所以可能它是从某个地方获取的+我不喜欢为我已经制作的单元测试硬编码的想法。最后,如果 OP 认为缓冲 JSONArray 实例完全没问题,我仍然在答案中注意到这一点:如果您已经缓冲了 JSONArray,则可以使用来自此处:使用 Java 8 JDK 将 Iterable 转换为 Stream
    • 感谢您的回复。
    猜你喜欢
    • 2020-11-11
    • 2013-12-05
    • 1970-01-01
    • 1970-01-01
    • 2021-04-06
    • 2020-08-05
    • 2023-02-07
    • 2015-09-09
    相关资源
    最近更新 更多