【发布时间】:2021-06-12 12:49:03
【问题描述】:
使用
ExecutorService.execute() 提交所有任务时如何等待它们完成。有一个函数叫做 awaitTermination
但是必须在其中提供超时。这不能保证当这个
返回所有任务将已完成。有没有办法做到这一点?
【问题讨论】:
标签: java multithreading parallel-processing executorservice
使用
ExecutorService.execute() 提交所有任务时如何等待它们完成。有一个函数叫做 awaitTermination
但是必须在其中提供超时。这不能保证当这个
返回所有任务将已完成。有没有办法做到这一点?
【问题讨论】:
标签: java multithreading parallel-processing executorservice
有几种方法。
您可以先调用ExecutorService.shutdown,然后再调用ExecutorService.awaitTermination,它会返回:
如果此执行程序终止,则为 true,如果超时,则为 false 终止前
所以:
有一个函数叫做 awaitTermination 但是必须有一个超时 其中提供。这并不能保证当它返回所有 任务就完成了。有没有办法做到这一点?
您只需循环调用awaitTermination。
使用 awaitTermination
此实现的完整示例:
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException {
final int total_threads = 4;
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100));
}
int count = 0;
// This is the relevant part
// Chose the delay most appropriate for your use case
executor.shutdown();
while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
System.out.println("Waiting "+ count);
count++;
}
}
private static Runnable parallelWork(long sleepMillis) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
};
}
}
使用 CountDownLatch
另一种选择是创建一个CountDownLatch,其中count 等于并行任务的数量。每个线程调用countDownLatch.countDown();,而main线程调用countDownLatch.await();。
此实现的完整示例:
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException {
final int total_threads = 4;
CountDownLatch countDownLatch = new CountDownLatch(total_threads);
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100, countDownLatch));
}
countDownLatch.await();
System.out.println("Exit");
executor.shutdown();
}
private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
countDownLatch.countDown();
};
}
}
使用循环障碍
另一种方法是使用Cyclic Barrier
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
final int total_threads = 4;
CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100, barrier));
}
barrier.await();
System.out.println("Exit");
executor.shutdown();
}
private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// Do something
}
};
}
}
还有其他方法,但这些方法需要更改您的初始要求,即:
提交后如何等待所有任务完成 使用 ExecutorService.execute() 。
【讨论】:
如果您使用类ThreadPoolExecutor 或其任何子类,您有一个方法getActiveCount() 返回正在执行任务的线程数。因此,您可以轮询该方法,直到它变为 0,这意味着所有任务都已完成并且当前没有新任务正在执行。但是,如果某些任务卡住了怎么办?我认为您还必须给出一些超时时间,以防止在这种情况下出现无限循环。
这个想法最大的好处是不需要调用shutdown方法
【讨论】:
如果您阅读ExecutorService.awaitTermination 的javadoc(或查看方法签名),您会看到它返回boolean。此布尔值指示 Executor 是否终止。您可以使用该信息创建while 循环以确定它是否已终止。
ExecutorService executor = ...
executor.shutdown(); // close the executor and don't accept new tasks
while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS) {}
这样的事情会停止执行器并等待它终止并且所有任务都完成。
【讨论】:
execute 方法不返回任何内容。您可以使用返回 Future 类型结果的 submit 方法。
Future<String> future =
executorService.submit(callableTask/runnableTask);
【讨论】: