【问题标题】:Make expensive for loop multithreaded, Java使循环多线程,Java昂贵
【发布时间】:2018-06-21 02:22:54
【问题描述】:

我有一个问题想使用 Java 的 ExecutorServiceFuture 类来解决。我目前正在使用 for 循环从一个对我来说计算成本非常高的函数中获取许多样本(每个样本可能需要几分钟)。我有一个类FunctionEvaluator 为我评估这个函数,并且这个类的实例化非常昂贵,因为它包含大量内部存储器,所以我通过一些内部计数器和一个 reset() 方法使这个类易于重用。所以我现在的情况是这样的:

int numSamples = 100;
int amountOfData = 1000000;
double[] data = new double[amountOfData];//Data comes from somewhere...
double[] results = new double[numSamples];
//a lot of memory contained inside the FunctionEvaluator class,
//expensive to intialise
FunctionEvaluator fe = new FunctionEvaluator();

for(int i=0; i<numSamples; i++) {
    results[i] = fe.sampleAt(i, data);//very expensive computation
}

但我想要一些多线程来加快速度。这应该很容易,因为虽然每个样本都将共享data 内部的任何内容,但它是一个只读操作,每个样本都独立于其他样本。现在我不会有任何麻烦,因为我以前使用过 Java 的 FutureExecutorService,但从来没有在必须重新使用 Callable 的情况下。所以总的来说,如果我有能力运行n FunctionEvaluator 的实例化,我将如何设置这个场景?像这样(非常粗略)的东西:

int numSamples = 100;
int amountOfData = 1000000;
int N = 10;

double[] data = new double[amountOfData];//Data comes from somewhere...
double[] results = new double[numSamples];
//a lot of memory contained inside the FunctionEvaluator class,
//expensive to intialise
FunctionEvaluator[] fe = new FunctionEvaluator[N];

for(int i=0; i<numSamples; i++) {
    //Somehow add available FunctionEvaluators to an ExecutorService
    //so that N FunctionEvaluators can run in parallel. When a 
    //FunctionEvaluator is finished, reset then compute a new sample
    //until numSamples samples have been taken.
}

任何帮助将不胜感激!非常感谢。

编辑

所以这是一个玩具示例(不起作用:P)。在这种情况下,我想要采样的“昂贵的函数”只是对一个整数进行平方,而为我做的“昂贵的实例化类”称为CallableComputation

在 TestConc.java 中:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class TestConc {

    public static void main(String[] args) {
        SquareCalculator squareCalculator = new SquareCalculator();
        int numFunctionEvaluators = 2;
        int numSamples = 10;

        ExecutorService executor = Executors.newFixedThreadPool(2);
        CallableComputation c1 = new CallableComputation(2);
        CallableComputation c2 = new CallableComputation(3);

        CallableComputation[] callables = new CallableComputation[numFunctionEvaluators];
        Future<Integer>[] futures = (new Future[numFunctionEvaluators]);
        int[] results = new int[numSamples];

        for(int i=0; i<numFunctionEvaluators; i++) {
            callables[i] = new CallableComputation(i);
            futures[i] = executor.submit(callables[i]);
        }

        futures[0] = executor.submit(c1);
        futures[1] = executor.submit(c2);

        for(int i=numFunctionEvaluators; i<numSamples; ) {
            for(int j=0; j<futures.length; j++) {
                if(futures[j].isDone()) {
                    try {
                        results[i] = futures[j].get();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                    callables[j].set(i);
                    System.out.printf("Function evaluator %d given %d\n", j, i+1);
                    executor.submit(callables[j]);
                    i++;
                }
            }
        }
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i=0; i<results.length; i++) {
            System.out.printf("res%d=%d, ", i, results[i]);
        }
        System.out.println();
    }

    private static boolean areDone(Future<Integer>[] futures) {
        for(int i=0; i<futures.length; i++) {
            if(!futures[i].isDone()) {
                return false;
            }
        }
        return true;
    }

    private static void printFutures(Future<Integer>[] futures) {
        for (int i=0; i<futures.length; i++) {
            System.out.printf("f%d=%s | ", i, futures[i].isDone()?"done" : "not done");
        }System.out.printf("\n");
    }

}

在 CallableComputation.java 中:

import java.util.concurrent.Callable;

public class CallableComputation implements Callable<Integer>{

    int input = 0;

    public CallableComputation(int input) {
        this.input = input;
    }

    public void set(int i) {
        input = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.printf("currval=%d\n", input);
        Thread.sleep(500);
        return input * input;
    }
}

【问题讨论】:

    标签: java multithreading concurrency future executorservice


    【解决方案1】:

    在 Java8 中:

    double[] result = IntStream.range(0, numSamples)
        .parallel()
        .mapToDouble(i->fe.sampleAt(i, data))
        .toArray();
    

    问题询问如何通过加载尽可能多的 CPU 来并行执行繁重的计算功能。

    来自Parallelism tutorial

    并行计算涉及将问题分解为子问题, 同时解决这些问题(并行,每个 子问题在单独的线程中运行),然后结合 子问题解的结果。 Java SE 提供 fork/join 框架,让你更容易实现 应用程序中的并行计算。然而,有了这个框架, 您必须指定问题如何细分(分区)。和 聚合操作,Java 运行时执行此分区并 为您组合解决方案。

    实际解决方案包括:

    • IntStream.range 将生成从 0 到 numSamples 的整数流。

    • parallel() 将拆分流并在机器上所有可用的 CPU 上执行它。

    • mapToDouble() 将通过应用将执行实际工作的 Lamba 表达式将整数流转换为双精度流。

    • toArray() 是一个终端操作,它将聚合结果并将其作为数组返回。

    【讨论】:

    • 谢谢你,格雷。感谢您的反馈。
    • 对,这一切现在对我来说都有意义,但就我目前所做的而言,我需要在每次调用 fe.sample(); 时调用 fe.reset(); 以重置 fe 的内存对象包含,因为我发现每次我想采样时重新实例化所有内存太昂贵了。请记住,这只是一个玩具示例,我想看看如何并行化一般的 for 循环,我评估的实际函数将数千个数据点作为参数,而不仅仅是一个整数。
    【解决方案2】:

    不需要特殊的代码更改,您可以一次又一次地使用相同的 Callable 而不会出现任何问题。另外,为了提高效率,正如您所说,创建 FunctionEvaluator 的实例很昂贵,您可以只使用一个实例并确保 sampleAt 是线程安全的。一种选择是,也许您可​​以使用所有函数局部变量,并且在任何线程运行时不要在任何时间点修改任何传递参数

    请在下面找到一个简单的示例:

    代码片段:

    ExecutorService executor = Executors.newFixedThreadPool(2);
    Callable<String> task1 = new Callable<String>(){public String call(){System.out.println(Thread.currentThread()+"currentThread");return null;}}
    executor.submit(task1);
    executor.submit(task1);
    executor.shutdown();
    

    请在下面找到截图:

    【讨论】:

    • 嗯,我真的不明白这如何解决我只使用几个实例的问题。因为如果我只使用一个实例并使其成为线程安全的,那么我将处于完全相同的情况下,即串行执行计算。我想这个想法是让 N 个实例化来获取 M>N 个样本。我添加了一个我想要的工作示例,也许你可以澄清一下?谢谢!
    • @MrSquid 这取决于在多线程环境中执行时真正导致问题的代码段。此外,新的代码正是我建议你做的,除了一件事,当同一个实例可以在多个线程上执行时,你为什么要创建多个 CallableComputation 实例。
    【解决方案3】:

    您可以将每个 FunctionEvaluator 的实际工作包装为 Callable/Runnanle,然后使用带有队列的 fixdThreadPool,然后您只需要将目标 callable/runnable 汇总到 threadPool。

    【讨论】:

      【解决方案4】:

      我想要一些多线程来加快速度。

      听起来不错,但您的代码过于复杂。 @Pavel 有一个非常简单的 Java 8 解决方案,但即使没有 Java 8,您也可以让它变得更容易。

      您需要做的就是将作业提交给执行程序,然后在返回的每个Futures 上调用get()。不需要Callable 类,尽管它确实使代码更清晰。但是你当然不需要那些模式不好的数组,因为拼写错误很容易产生越界异常。坚持使用集合或 Java 8 流。

      ExecutorService executor = Executors.newFixedThreadPool(2);
      List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
      for (int i = 0; i < numSamples; i++ ) {
          // start the jobs running in the background
          futureList.add(executor.subject(new CallableComputation(i));
      }
      // shutdown executor if done submitting tasks, submitted jobs will keep running
      executor.shutdown();
      for (Future<Integer> future : futureList) {
          // this will wait for the future to finish, it also throws some exceptions
          Integer result = future.get();
          // add result to a collection or something here
      }
      

      【讨论】:

        猜你喜欢
        • 2014-03-08
        • 2017-12-08
        • 2011-12-06
        • 1970-01-01
        • 2013-12-24
        • 2016-01-31
        • 2013-10-18
        • 2012-05-17
        • 2010-10-08
        相关资源
        最近更新 更多