【发布时间】:2018-06-21 02:22:54
【问题描述】:
我有一个问题想使用 Java 的 ExecutorService 和 Future 类来解决。我目前正在使用 for 循环从一个对我来说计算成本非常高的函数中获取许多样本(每个样本可能需要几分钟)。我有一个类FunctionEvaluator 为我评估这个函数,并且这个类的实例化非常昂贵,因为它包含大量内部存储器,所以我通过一些内部计数器和一个 reset() 方法使这个类易于重用。所以我现在的情况是这样的:
int numSamples = 100;
int amountOfData = 1000000;
double[] data = new double[amountOfData];//Data comes from somewhere...
double[] results = new double[numSamples];
//a lot of memory contained inside the FunctionEvaluator class,
//expensive to intialise
FunctionEvaluator fe = new FunctionEvaluator();
for(int i=0; i<numSamples; i++) {
results[i] = fe.sampleAt(i, data);//very expensive computation
}
但我想要一些多线程来加快速度。这应该很容易,因为虽然每个样本都将共享data 内部的任何内容,但它是一个只读操作,每个样本都独立于其他样本。现在我不会有任何麻烦,因为我以前使用过 Java 的 Future 和 ExecutorService,但从来没有在必须重新使用 Callable 的情况下。所以总的来说,如果我有能力运行n FunctionEvaluator 的实例化,我将如何设置这个场景?像这样(非常粗略)的东西:
int numSamples = 100;
int amountOfData = 1000000;
int N = 10;
double[] data = new double[amountOfData];//Data comes from somewhere...
double[] results = new double[numSamples];
//a lot of memory contained inside the FunctionEvaluator class,
//expensive to intialise
FunctionEvaluator[] fe = new FunctionEvaluator[N];
for(int i=0; i<numSamples; i++) {
//Somehow add available FunctionEvaluators to an ExecutorService
//so that N FunctionEvaluators can run in parallel. When a
//FunctionEvaluator is finished, reset then compute a new sample
//until numSamples samples have been taken.
}
任何帮助将不胜感激!非常感谢。
编辑
所以这是一个玩具示例(不起作用:P)。在这种情况下,我想要采样的“昂贵的函数”只是对一个整数进行平方,而为我做的“昂贵的实例化类”称为CallableComputation:
在 TestConc.java 中:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class TestConc {
public static void main(String[] args) {
SquareCalculator squareCalculator = new SquareCalculator();
int numFunctionEvaluators = 2;
int numSamples = 10;
ExecutorService executor = Executors.newFixedThreadPool(2);
CallableComputation c1 = new CallableComputation(2);
CallableComputation c2 = new CallableComputation(3);
CallableComputation[] callables = new CallableComputation[numFunctionEvaluators];
Future<Integer>[] futures = (new Future[numFunctionEvaluators]);
int[] results = new int[numSamples];
for(int i=0; i<numFunctionEvaluators; i++) {
callables[i] = new CallableComputation(i);
futures[i] = executor.submit(callables[i]);
}
futures[0] = executor.submit(c1);
futures[1] = executor.submit(c2);
for(int i=numFunctionEvaluators; i<numSamples; ) {
for(int j=0; j<futures.length; j++) {
if(futures[j].isDone()) {
try {
results[i] = futures[j].get();
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (ExecutionException e) {
e.printStackTrace();
}
callables[j].set(i);
System.out.printf("Function evaluator %d given %d\n", j, i+1);
executor.submit(callables[j]);
i++;
}
}
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.MINUTES);
}
catch (InterruptedException e) {
e.printStackTrace();
}
for (int i=0; i<results.length; i++) {
System.out.printf("res%d=%d, ", i, results[i]);
}
System.out.println();
}
private static boolean areDone(Future<Integer>[] futures) {
for(int i=0; i<futures.length; i++) {
if(!futures[i].isDone()) {
return false;
}
}
return true;
}
private static void printFutures(Future<Integer>[] futures) {
for (int i=0; i<futures.length; i++) {
System.out.printf("f%d=%s | ", i, futures[i].isDone()?"done" : "not done");
}System.out.printf("\n");
}
}
在 CallableComputation.java 中:
import java.util.concurrent.Callable;
public class CallableComputation implements Callable<Integer>{
int input = 0;
public CallableComputation(int input) {
this.input = input;
}
public void set(int i) {
input = i;
}
@Override
public Integer call() throws Exception {
System.out.printf("currval=%d\n", input);
Thread.sleep(500);
return input * input;
}
}
【问题讨论】:
标签: java multithreading concurrency future executorservice