【问题标题】:Preprocessing CSV data efficiently before or while parallel streaming在并行流式处理之前或期间有效地预处理 CSV 数据
【发布时间】:2017-09-28 15:19:24
【问题描述】:

我正在寻找一种在将 CSV 数据转储到 Java 流之前(或同时)对其进行预处理的有效方法。

在正常情况下我会做这样的事情来处理文件:

File input = new File("helloworld.csv");
InputStream is = new FileInputStream(input);
BufferedReader br = new BufferedReader(new InputStreamReader(is));
br.lines().parallel().forEach(line -> {
    System.out.println(line);
});

但是,在目前的情况下,我需要在流式传输记录之前或期间对记录进行预处理,并且我的集合中的每个项目都可能依赖于前一个。下面是一个简单的示例 CSV 文件来演示该问题:

species, breed, name
dog, lab, molly
, greyhound, stella
, beagle, stanley
cat, siamese, toby
, persian, fluffy

在我的示例 CSV 中,物种列仅在从记录更改为记录时才会填充。我知道简单的答案是修复我的 CSV 输出,但在这种情况下这是不可能的。

我正在寻找一种合理有效的方法来处理来自 CSV 的记录,如果为空白,则从先前记录中复制物种值,然后在预处理后传递到并行流。

下游处理可能需要很长时间,所以一旦预处理完成,我最终需要并行处理。我的 CSV 文件也可能很大,所以我想避免先将整个文件加载到内存中的对象中。

我希望有某种方法可以执行以下操作(警告错误的伪代码)

parallelStream.startProcessing

while read line {
    if (line.doesntHaveSpecies) {
        line.setSpecies
    }
    parallelStream.add(line)
}

我目前的解决方案是处理整个文件并“修复它”然后流式传输。由于文件可能很大,因此最好在“修复”记录之后和处理整个文件之前立即开始处理记录。

【问题讨论】:

    标签: java csv java-8 java-stream


    【解决方案1】:

    您必须将状态封装成Spliterator

    private static Stream<String> getStream(BufferedReader br) {
        return StreamSupport.stream(
            new Spliterators.AbstractSpliterator<String>(
                                                100, Spliterator.ORDERED|Spliterator.NONNULL) {
                String prev;
                public boolean tryAdvance(Consumer<? super String> action) {
                    try {
                        String next = br.readLine();
                        if(next==null) return false;
                        final int ix = next.indexOf(',');
                        if(ix==0) {
                            if(prev==null)
                                throw new IllegalStateException("first line without value");
                            next = prev+next;
                        }
                        else prev=ix<0? next: next.substring(0, ix);
                        action.accept(next);
                        return true;
                    } catch (IOException ex) {
                        throw new UncheckedIOException(ex);
                    }
                }
            }, false);
    }
    

    可以用作

    try(Reader r = new FileReader(input);
        BufferedReader br = new BufferedReader(r)) {
    
        getStream(br).forEach(System.out::println);
    }
    

    Spliterator 将始终按顺序遍历。如果打开并行处理,Stream 实现将尝试通过调用trySplit 为其他线程获取新的Spliterator 实例。由于我们无法为该操作提供有效的策略,我们从AbstractSpliterator 继承默认值,它将执行一些基于数组的缓冲。这将始终正常工作,但只有在后续流管道中有大量计算时才会得到回报。否则,您可能只是继续使用顺序执行。

    【讨论】:

    • 我想我会在周末用自定义 Spliterator 回答这个问题,我想现在不会了 :)
    • 对于计算量不一定很繁重的耗时操作(例如访问 REST 端点),您是否会看到同样的好处?
    • 您可能会看到好处,具体取决于 I/O 操作和环境,但 Stream API 使用为计算量身定制的配置,即使用与 CPU 内核数量匹配的目标并发,这可能不会成为特定 I/O 操作的最佳选择。
    【解决方案2】:

    您不能使用并行流启动它,因为它必须按顺序执行才能从前一行获取物种。所以我们可以引入一些副作用映射器:

    final String[] species = new String[1];
    final Function<String, String> speciesAppending = l -> {
        if (l.startsWith(",")) {
            return species[0] + l;
        } else {
            species[0] = l.substring(0, l.indexOf(','));
            return l;
        }
    };
    
    try (Stream<String> stream = Files.lines(new File("helloworld.csv").toPath())) {
        stream.map(speciesAppending).parallel()... // TODO
    }
    

    【讨论】:

    • 那么跟踪/存储在随后调用映射器时要引用的先前物种的最佳方法是什么?我想我要问的是你在哪里定义物种?
    • 是一个简单的String数组,更新了答案放到代码里。
    猜你喜欢
    • 1970-01-01
    • 2019-02-08
    • 2017-02-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多