【问题标题】:Java - Multithreading one big loopJava - 多线程一个大循环
【发布时间】:2013-11-13 23:23:21
【问题描述】:

这可能是一个非常简单的问题,但由于我从未使用过线程,因此我认为最好是问而不是试图完全自己找到最佳解决方案。

我有一个巨大的 for 循环,它运行了数十亿次。在每次on循环运行时,程序根据当前的index,计算出一个数字形式的最终结果。我只对存储前result(或前x 个结果)及其相应索引感兴趣。

我的问题很简单,在线程中运行这个循环的正确方法是什么,以便它使用所有可用的 CPU/内核。

int topResultIndex;
double topResult = 0;

for (i=1; i < 1000000000; ++i) {
    double result = // some complicated calculation based on the current index
    if (result > topResult) {
        topResult = result;
        topResultIndex = i;
    }
}

每个索引的计算完全独立,不共享资源。 topResultIndextopResult 显然会被每个线程访问。

* 更新:Giulio 和 rolfl 的解决方案都很好,也非常相似。只能接受其中一个作为我的答案。

【问题讨论】:

  • 每个索引的计算是独立的还是要共享资源进行计算?
  • 每个索引的计算完全独立
  • 如果循环受 CPU 限制,多线程提高其速度(通过阿姆达尔定律给出的系数)。如果瓶颈是内存,它就行不通(因为多线程不会使 RAM 运行得更快)
  • 好吧,当我在双核 CPU 上运行循环时,我看到 CPU 使用率正好上升到 50%,所以我可以假设 CPU 确实是瓶颈。 Ofc,只有在线程中实际运行后才能确定。

标签: java multithreading loops multiprocessing threadpool


【解决方案1】:

最简单的方法是使用ExecutorService 并将您的任务提交为RunnableCallable。您可以使用Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) 创建一个ExeuctorService,它将使用与处理器数量相同的线程数。

【讨论】:

  • 好的,但我的“任务”是什么。我应该只划分循环吗?例如,如果我在 2 个处理器上,一个会从 0 运行到 500000000,另一个从 500000001 运行到 1000000000?还是有更合适的解决方案?或者你的意思是每个循环运行本身都是一个新的可运行对象? (创建这么多对象听起来不太聪明)
  • 您的任务是漫长而复杂的计算。创建一个类似getResult(int index) 的方法,并将其用作您的任务。
【解决方案2】:

除了观察到带有 OpenMP 或其他一些并行计算扩展的 C 程序会是一个更好的主意之外,Java 的方法是创建一个计算问题子集的“未来”任务:

private static final class Result {
   final int index;
   final double result;
   public Result (int index, double result) {
       this.result = result;
       this.index = index;
   }
}

// Calculate 10,000 values in each thead
int steps = 10000;
int cpucount = Runtime.getRuntime().availableProcessors();
ExecutorService service = Executors.newFixedThreadPool(cpucount);
ArrayList<Future<Result>> results = new ArrayList<>();
for (int i = 0; i < 1000000000; i+= steps) {
    final int from = i;
    final int to = from + steps;
    results.add(service.submit(new Callable<Result>() {
        public Result call() {
              int topResultIndex = -1;
              double topResult = 0;
              for (int j = from; j < to; j++) {
                  // do complicated things with 'j'
                      double result = // some complicated calculation based on the current index
                      if (result > topResult) {
                          topResult = result;
                          topResultIndex = j;
                      }
              }
              return new Result(topResultIndex, topResult);
        }
    });
 }

 service.shutdown();
 while (!service.isTerminated()) {
     System.out.println("Waiting for threads to complete");
     service.awaitTermination(10, TimeUnit.SECONDS);
 }
 Result best = null;
 for (Future<Result> fut : results) {
    if (best == null || fut.result > best.result) {
       best = fut;
    }
 }

 System.out.printf("Best result is %f at index %d\n", best.result, best.index);

Future<Result>

【讨论】:

  • 非常感谢您的深入解决方案。你认为这会在 C/C++ 中运行得更好吗?我的整个程序已经用 java 编写了,但我实际上考虑过用 c++ 编写程序的这一部分,如果它确实会显示出显着的改进。
  • 其实调用Runtime.getRuntime().availableProcessors()会返回没有任何外部库的占有者数量
  • 已编辑,感谢 vandale。 @SportySpice - 这取决于情况。 Java JIT 可能能够在您的特定情况下展开激烈竞争。
【解决方案3】:

假设结果是由calculateResult(long) 方法计算的,该方法是私有的和静态的,并且不访问任何静态字段,(它也可以是非静态的,但它仍然必须是线程安全和并发的- 可执行,希望是线程限制的)。

那么,我认为这会做脏活:

public static class Response {
    int index;
    double result;
}

private static class MyTask implements Callable<Response> {
    private long from;
    private long to;

    public MyTask(long fromIndexInclusive, long toIndexExclusive) {
        this.from = fromIndexInclusive;
        this.to = toIndexExclusive;
    }

    public Response call() {
        int topResultIndex;
        double topResult = 0;

        for (long i = from; i < to; ++i) {
            double result = calculateResult(i);
            if (result > topResult) {
                topResult = result;
                topResultIndex = i;
            }
        }

        Response res = new Response();
        res.index = topResultIndex;
        res.result = topResult;
        return res;
    }
};

private static calculateResult(long index) { ... }

public Response interfaceMethod() {
    //You might want to make this static/shared/global
    ExecutorService svc = Executors.newCachedThreadPool();

    int chunks = Runtime.getRuntime().availableProcessors();
    long iterations = 1000000000;
    MyTask[] tasks = new MyTask[chunks];
    for (int i = 0; i < chunks; ++i) {
        //You'd better cast to double and round here
        tasks[i] = new MyTask(iterations / chunks * i, iterations / chunks * (i + 1));
    }

    List<Future<Response>> resp = svc.invokeAll(Arrays.asList(tasks));
    Iterator<Future<Response>> respIt = resp.iterator();

    //You'll have to handle exceptions here
    Response bestResponse = respIt.next().get();

    while (respIt.hasNext()) {
        Response r = respIt.next().get();
        if (r.result > bestResponse.result) {
            bestResponse = r;
        }
    }

    return bestResponse;
}

根据我的经验,这种分块比为每个索引分配一个任务要快得多(特别是如果每​​个单个索引的计算负载很小,就像它可能那样。小,我的意思是不到半秒) .但是,编码有点困难,因为您需要进行两步最大化(首先在块级别,然后在全局级别)。有了这个,如果计算完全基于 cpu(不会过多地推动 ram),您应该获得几乎等于物理内核数量 80% 的加速。

【讨论】:

  • calculateResult() 包含在网络请求中的情况如何,该请求涉及一些等待(例如一两秒)。你会为每个索引分配一个任务吗?
  • 仍然取决于您需要发出多少请求。一般来说,我会在任务和索引之间建立 1:1 的关系,但你不想让等待线程吞噬你的操作系统。在普通台式机/笔记本电脑上,对于网络或硬盘请求,我会避免一次调用产生超过 20-30 个线程。还要考虑到您需要等待的设备有其自身的限制,向其提出过多的请求将无济于事。无论如何,当任务是 IO-bound 时,任务的数量可能远远大于 CPU 的数量
猜你喜欢
  • 2013-12-24
  • 1970-01-01
  • 2023-01-06
  • 1970-01-01
  • 2012-03-28
  • 1970-01-01
  • 2017-06-13
  • 2021-09-21
  • 2012-10-28
相关资源
最近更新 更多