【问题标题】:Optimizing parallel processing of many files优化多个文件的并行处理
【发布时间】:2012-07-19 16:04:30
【问题描述】:

我有一个程序处理大量文件,其中每个文件需要做两件事:首先,读取和处理文件的一部分,然后存储生成的MyFileData。第一部分可以并行,第二部分不能。

按顺序做所有事情都很慢,因为CPU必须等待磁盘,然后它会工作一点,然后发出另一个请求,然后再次等待......

我做了以下

class MyCallable implements Callable<MyFileData> {
    MyCallable(File file) {
        this.file = file;
    }
    public MyFileData call() {
        return someSlowOperation(file);
    }
    private final File file;
}

for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());

它帮助很大。不过,我想改进两点:

  • sequentialOperation 以固定顺序执行,而不是先处理任何可用的结果。如何更改?

  • 有数千个文件需要处理,启动数千个磁盘请求可能会导致磁盘垃圾。通过使用Executors.newFixedThreadPool(10),我限制了这个数字,但我正在寻找更好的东西。理想情况下,它应该是自我调整的,以便在不同的计算机上运行最佳(例如,当RAID 和/或NCQ 可用时发出更多请求等)。我不认为它可以基于找出硬件配置,但测量处理速度并基于它进行优化应该以某种方式是可能的。有什么想法吗?

【问题讨论】:

  • 我相信“Chii”有答案:继续您的并行操作,但将这些结果放入队列(更适合磁盘 IO 的串行性质)以进行磁盘写入。

标签: java multithreading parallel-processing executorservice


【解决方案1】:

sequentialOperation 以固定顺序执行,而不是先处理任何可用的结果。怎么改?

这正是 CompletionService 所做的:它并行处理任务并在任务完成时返回它们,而不管提交顺序如何。

简化(未测试)示例:

int NUM_THREADS = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor);

for (File f : files) futures.add(completionService.submit(new MyCallable(f)));

for(int i = 0; i < futures.size(); i++) {
    Future<MyFileData> next = completionService.take();
    sequentialOperation(next.get());
}

有数千个文件需要处理,启动数千个磁盘请求可能会导致磁盘垃圾。通过使用 Executors.newFixedThreadPool(10) 我限制了这个数字,但是我正在寻找更好的东西。

我不是 100% 确定那个。我想这取决于你有多少磁盘,但我认为磁盘访问部分不应该分成太多线程(每个磁盘一个线程可能是明智的):如果多个线程同时访问一个磁盘,它会花更多的时间寻找而不是阅读。

【讨论】:

  • 同时问这两个问题是个坏主意。也许我会将磁盘部分移到一个新问题中。 CompletionService 是更简单的部分的最简单的解决方案,并且可以立即工作。
【解决方案2】:

sequentialOperation 以固定顺序执行,而不是先处理任何可用的结果。怎么改?

假设:每个someSlowOperation(file); 调用都将花费不同的时间,因此,您希望在收到MyFileData 后立即处理它,而不是与另一个sequentialOperation 同时处理。

您可以通过设置生产者/消费者队列来实现此目的。

生产者是您在示例中执行的callables,其中添加的位是您将结果添加到等待处理的工作队列中。

Consumer 是 sequentialOperation() 调用 - 它在自己的线程中运行,并且只有一个。这个线程所做的只是获取队列的头部,并处理它,重复直到程序结束。

这样,您可以最大限度地利用机器上的所有资源。

带有一些示例代码的相关帖子:Producer/Consumer threads using a Queue

编辑:我想你可能想要一个快速的样本,因为它对以前从未做过的人来说非常不透明

public class Main {

    private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10);
    private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1);
    private final LinkedBlockingQueue<MyData> queue = new LinkedBlockingQueue();//or some other impl

    abstract class Producer implements Runnable{
        private final File file;
        Producer(File file) {
            this.file = file;
        }

        public void run() {
            MyData result = someLongAssOperation(file);
            queue.offer(result);
        }

        public abstract void someLongAssOperation(File file);
    }

    abstract class Consumer implements Runnable {
        public void run() {
            while (true) {
                sequentialOperation(queue.take());  
            }
        }

        public abstract void sequentialOperation(MyData data);
    } 

    private void start() {
        consumerExecutor.submit(new Consumer(){
            //implement sequentialOperation here
        });

        for (File f : files) {
            producerExecutor.submit(new Producer(file) {
                //implement the someLongAssOperation()
            });
        }

    }

    public static void main(String[] args) {
        new Main().start();     
    } 

}

【讨论】:

  • 相当复杂,但很高兴知道。现在我坚持使用另一个答案中的CompletionService,因为它的工作量要少得多。也许我需要稍后运行我自己的队列,让我们看看。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-01-13
  • 2011-07-04
  • 1970-01-01
  • 1970-01-01
  • 2016-09-18
  • 1970-01-01
  • 2016-05-26
相关资源
最近更新 更多