【问题标题】:want to parallelize a nested for loop using multithreading in java想在java中使用多线程并行化嵌套的for循环
【发布时间】:2017-04-27 09:26:43
【问题描述】:

我想使用执行器服务或使用 java 中的任何其他方法并行化嵌套的 for 循环。我想创建一些固定数量的线程,以便线程不会完全获取 CPU。这里的每个线程都在做一些独立的工作。第一个线程应该执行 j = 0, 20, 30, 40, ... 第二个线程应该执行 j = 1, 21, 31, 41, ... 每个线程都应该并行执行。这是我想做的,

ExecutorService service = Executors.newFixedThreadPool(NoOfThreads);
  for(int i = 0; i < 100; i++) {
    for(int j=0; j < 50000000; j++) {
      //some independent work
      //parallelize this work
    ...

这就是我所做的

for (int i = 0; i < 100; i++) {
  ExecutorService executorService = Executors.newFixedThreadPool(20);
  for (int j=0; j < 50000000; j++) {
    executorService.execute(new Runnable() {
      @Override
      public void run() {
        //do some work
        //send data to some api
      }});
  }
  executorService.shutdown();
  while (!executorService.isTerminated()) {
    System.out.print("");
  }
}

我想确保这是我想做的正确实现。请告诉我如何改进我的代码。

【问题讨论】:

  • 我建议使用并行流并让它处理您的问题。它可能有效,也可能无效,我不确定,但我会测试它。
  • @Shadov 我认为这不是一个很好的建议。简单的道理是:人生没有弯路。尤其是在谈论在编程中使用“并行”抽象时。一个人必须了解你在做什么。只是将“并行流”扔给通常对使用并行事物没有太多线索的人不会有太大帮助。他已经负担过重使用执行器和任务。拉起并行流有什么好处?!

标签: java multithreading parallel-processing nested-loops


【解决方案1】:

嗯,这里有两个明显的问题:

 while (!executorService.isTerminated()) {
   System.out.print("");
 }

将意味着调用此代码的 ma​​in 线程将进行 hot 等待。你应该在这里做一个 Thread.sleep() 调用;避免无缘无故地消耗数以亿计的 CPU 周期。

不过没关系;因为你的代码在这里

for (int i = 0; i < 100; i++) {
  ExecutorService executorService = Executors.newFixedThreadPool(20);

创建 100 * 20 个线程;处理您打算推进的 50000000 个任务。也许我错了;但我有一种直觉,这将使大多数系统超出其极限。

导致:在结构上,这不是一个好方法。创建该服务;并添加任务;和任务内容本身都是不同的东西。

从这个意义上说:

  • 您想创建一个执行器服务(使用多个线程,这些线程在某种程度上类似于底层硬件的功能)
  • 您希望将较少数量的任务推入其中

含义:当您必须处理 100 万个元素时,您不会创建 100 万个任务。你可能会创建 1000 个任务;每个工作 1000 个元素。

【讨论】:

    【解决方案2】:

    您的代码分配了 50000000 个 Runnable 对象,这会很慢。 相反,每个工作线程都应该运行一个循环来获取下一个要处理的“j”值。使用 AtomicInteger 确保每个“j”值都被处理一次。

    这是一些大纲代码:

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ParallelLoop {
        public static void main(String[] args) throws InterruptedException {
    
            for (int i = 0; i < 10; i++) {
                final int n = 50000000;
                final AtomicInteger atomicJ = new AtomicInteger();
    
                int nThread = 20;
                ExecutorService es = Executors.newFixedThreadPool(nThread);
                for (int t = 0; t < nThread; t++) {
                    Runnable r = new Runnable() {
                        public void run() {
                            while (true) {
                                int j = atomicJ.getAndIncrement();
                                if (j >= n)
                                    return;
                                // Process J ....
                            }
                        }
                    };
                    es.submit(r);
                }
                es.shutdown();
                es.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
                System.out.println("====");
            }
        }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2019-03-09
      • 2017-09-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多