【问题标题】:What is the easiest way to parallelize a task in java?在java中并行化任务的最简单方法是什么?
【发布时间】:2011-01-02 05:33:51
【问题描述】:

假设我有这样的任务:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

并行化每个 compute() 的最简单方法是什么(假设它们已经可并行化)?

我不需要与上述代码严格匹配的答案,只是一个一般性的答案。但如果您需要更多信息:我的任务是 IO 绑定的,这是针对 Spring Web 应用程序的,这些任务将在 HTTP 请求中执行。

【问题讨论】:

  • 第二行应该是Result result = compute(object);吗?

标签: java multithreading parallel-processing


【解决方案1】:

我建议您查看ExecutorService

特别是这样的:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

请注意,如果 objects 是一个大列表,则使用 newCachedThreadPool 可能会很糟糕。缓存的线程池可以为每个任务创建一个线程!您可能想要使用newFixedThreadPool(n),其中 n 是合理的(例如您拥有的内核数量,假设 compute() 受 CPU 限制)。

以下是实际运行的完整代码:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}

【讨论】:

  • 这个有c#版本吗?
  • 还可以查看 Executors,它作为各种类型的 executor 服务的工厂。
  • @Malfist 在 C# 中有一些任务(对于即将到来的 .net 4 来说很好)使所有这些变得轻而易举:)。并且在 3.5 中有委托/lambdas 和线程、函数、线程​​启动等来完成
  • @Malfist,我知道这是一个旧评论,但 C# 现在有 Parallel.ForEachTask Parallels Library - aka TPL。它们非常完整。
【解决方案2】:

使用 Java8 及更高版本,您可以在集合上使用 parallelStream 来实现此目的:

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

注意:结果列表的顺序可能与对象列表中的顺序不匹配。

此 stackoverflow 问题how-many-threads-are-spawned-in-parallelstream-in-java-8 中提供了如何设置正确线程数的详细信息

【讨论】:

  • 在我看来这是代码异味。您正在使用 parallelStream 阻止所有其他代码。在测试或小型应用程序中,我可能没问题,但在大型服务器上,这可能是灾难的根源。
  • 流是为数据并行而不是任务并行而设计的。见stackoverflow.com/a/23370799/208288
【解决方案3】:

可以简单地创建几个线程并得到结果。

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

编辑:我认为其他解决方案更酷。

【讨论】:

    【解决方案4】:

    如需更详细的答案,请阅读Java Concurrency in Practice 并使用java.util.concurrent

    【讨论】:

    • 这应该是一个内容伴侣
    【解决方案5】:

    这是我在自己的项目中使用的东西:

    public class ParallelTasks
    {
        private final Collection<Runnable> tasks = new ArrayList<Runnable>();
    
        public ParallelTasks()
        {
        }
    
        public void add(final Runnable task)
        {
            tasks.add(task);
        }
    
        public void go() throws InterruptedException
        {
            final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                    .availableProcessors());
            try
            {
                final CountDownLatch latch = new CountDownLatch(tasks.size());
                for (final Runnable task : tasks)
                    threads.execute(new Runnable() {
                        public void run()
                        {
                            try
                            {
                                task.run();
                            }
                            finally
                            {
                                latch.countDown();
                            }
                        }
                    });
                latch.await();
            }
            finally
            {
                threads.shutdown();
            }
        }
    }
    
    // ...
    
    public static void main(final String[] args) throws Exception
    {
        ParallelTasks tasks = new ParallelTasks();
        final Runnable waitOneSecond = new Runnable() {
            public void run()
            {
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                }
            }
        };
        tasks.add(waitOneSecond);
        tasks.add(waitOneSecond);
        tasks.add(waitOneSecond);
        tasks.add(waitOneSecond);
        final long start = System.currentTimeMillis();
        tasks.go();
        System.err.println(System.currentTimeMillis() - start);
    }
    

    在我的双核盒子上打印了超过 2000 个。

    【讨论】:

      【解决方案6】:

      您可以使用ThreadPoolExecutor。下面是示例代码:http://programmingexamples.wikidot.com/threadpoolexecutor(这里太长了,这里就不放了)

      【讨论】:

        【解决方案7】:

        Fork/Join 的并行数组是一种选择

        【讨论】:

          【解决方案8】:

          我要提到一个执行器类。下面是一些示例代码,您可以放置​​在 executor 类中。

              private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);
          
              private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();
          
              public void addCallable(Callable<Object> callable) {
                  this.callableList.add(callable);
              }
          
              public void clearCallables(){
                  this.callableList.clear();
              }
          
              public void executeThreads(){
                  try {
                  threadLauncher.invokeAll(this.callableList);
                  } catch (Exception e) {
                      // TODO Auto-generated catch block
                      e.printStackTrace();
                  }
              }
          
              public Object[] getResult() {
          
                  List<Future<Object>> resultList = null;
                  Object[] resultArray = null;
                  try {
          
                      resultList = threadLauncher.invokeAll(this.callableList);
          
                      resultArray = new Object[resultList.size()];
          
                      for (int i = 0; i < resultList.size(); i++) {
                          resultArray[i] = resultList.get(i).get();
                      }
          
                  } catch (Exception e) {
                      // TODO Auto-generated catch block
                      e.printStackTrace();
                  }
          
                  return resultArray;
              }
          

          然后要使用它,您需要调用执行器类来填充和执行它。

          executor.addCallable( some implementation of callable) // do this once for each task 
          Object[] results = executor.getResult();
          

          【讨论】:

          • 一组作业没有包装类总是让我很恼火
          【解决方案9】:

          一种巧妙的方法是利用 ExecutorCompletionService。

          假设您有以下代码(如您的示例):

           public static void main(String[] args) {
              List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
              List<List<Character>> list = new ArrayList<>();
          
              for (char letter : letters) {
                List<Character> result = computeLettersBefore(letter);
                list.add(result);
              }
          
              System.out.println(list);
            }
          
            private static List<Character> computeLettersBefore(char letter) {
              return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
            }
          

          现在要并行执行任务,您需要做的就是创建由线程池支持的 ExecutorCompletionService。然后提交任务并阅读结果。由于 ExecutorCompletionService 在后台使用 LinkedBlockingQueue,结果一旦可用就可以获取(如果您运行代码,您会注意到结果的顺序是随机的):

          public static void main(String[] args) throws InterruptedException, ExecutionException {
              final ExecutorService threadPool = Executors.newFixedThreadPool(3);
              final ExecutorCompletionService<List<Character>> completionService = new ExecutorCompletionService<>(threadPool);
          
              final List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
              List<List<Character>> list = new ArrayList<>();
          
              for (char letter : letters) {
                completionService.submit(() -> computeLettersBefore(letter));
              }
          
              // NOTE: instead over iterating over letters again number of submitted tasks can be used as a base for loop
              for (char letter : letters) {
                final List<Character> result = completionService.take().get();
                list.add(result);
              }
          
              threadPool.shutdownNow(); // NOTE: for safety place it inside finally block 
          
              System.out.println(list);
            }
          
            private static List<Character> computeLettersBefore(char letter) {
              return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
            }
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2010-12-29
            • 2017-04-24
            • 1970-01-01
            • 2015-12-06
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多