【问题标题】:Wait for all tasks in ExecutorService.execute to complete等待 ExecutorService.execute 中的所有任务完成
【发布时间】:2021-06-12 12:49:03
【问题描述】:

使用
ExecutorService.execute() 提交所有任务时如何等待它们完成。有一个函数叫做 awaitTermination
但是必须在其中提供超时。这不能保证当这个
返回所有任务将已完成。有没有办法做到这一点?

【问题讨论】:

    标签: java multithreading parallel-processing executorservice


    【解决方案1】:

    有几种方法。

    您可以先调用ExecutorService.shutdown,然后再调用ExecutorService.awaitTermination,它会返回:

    如果此执行程序终止,则为 true,如果超时,则为 false 终止前

    所以:

    有一个函数叫做 awaitTermination 但是必须有一个超时 其中提供。这并不能保证当它返回所有 任务就完成了。有没有办法做到这一点?

    您只需循环调用awaitTermination

    使用 awaitTermination

    此实现的完整示例:

    public class WaitForAllToEnd {
    
        public static void main(String[] args) throws InterruptedException {
            final int total_threads = 4;
            ExecutorService executor = Executors.newFixedThreadPool(total_threads);
            for(int i = 0; i < total_threads; i++){
                executor.execute(parallelWork(100 + i * 100));
            }
    
            int count = 0;
    
            // This is the relevant part
            // Chose the delay most appropriate for your use case
            executor.shutdown();
            while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
                System.out.println("Waiting "+ count);
                count++;
            }
        }
    
        private static Runnable parallelWork(long sleepMillis) {
            return () -> {
                try {
                    Thread.sleep(sleepMillis);
                } catch (InterruptedException e) {
                    // Do Something
                }
                System.out.println("I am Thread : " + Thread.currentThread().getId());
            };
        }
    }
    

    使用 CountDownLatch

    另一种选择是创建一个CountDownLatch,其中count 等于并行任务的数量。每个线程调用countDownLatch.countDown();,而main线程调用countDownLatch.await();

    此实现的完整示例:

    public class WaitForAllToEnd {
    
        public static void main(String[] args) throws InterruptedException {
            final int total_threads = 4;
            CountDownLatch countDownLatch = new CountDownLatch(total_threads);
            ExecutorService executor = Executors.newFixedThreadPool(total_threads);
            for(int i = 0; i < total_threads; i++){
                executor.execute(parallelWork(100 + i * 100, countDownLatch));
            }
            countDownLatch.await();
            System.out.println("Exit");
            executor.shutdown();
        }
    
        private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
            return () -> {
                try {
                    Thread.sleep(sleepMillis);
                } catch (InterruptedException e) {
                    // Do Something
                }
                System.out.println("I am Thread : " + Thread.currentThread().getId());
                countDownLatch.countDown();
            };
        }
    }
    

    使用循环障碍

    另一种方法是使用Cyclic Barrier

    public class WaitForAllToEnd {
    
        public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
            final int total_threads = 4;
            CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
            ExecutorService executor = Executors.newFixedThreadPool(total_threads);
            for(int i = 0; i < total_threads; i++){
                executor.execute(parallelWork(100 + i * 100, barrier));
            }
            barrier.await();
            System.out.println("Exit");
            executor.shutdown();
        }
    
        private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
            return () -> {
                try {
                    Thread.sleep(sleepMillis);
                } catch (InterruptedException e) {
                    // Do Something
                }
                System.out.println("I am Thread : " + Thread.currentThread().getId());
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                  // Do something
                }
            };
        }
    }
    

    还有其他方法,但这些方法需要更改您的初始要求,即:

    提交后如何等待所有任务完成 使用 ExecutorService.execute() 。

    【讨论】:

      【解决方案2】:

      如果您使用类ThreadPoolExecutor 或其任何子类,您有一个方法getActiveCount() 返回正在执行任务的线程数。因此,您可以轮询该方法,直到它变为 0,这意味着所有任务都已完成并且当前没有新任务正在执行。但是,如果某些任务卡住了怎么办?我认为您还必须给出一些超时时间,以防止在这种情况下出现无限循环。

      这个想法最大的好处是不需要调用shutdown方法

      【讨论】:

        【解决方案3】:

        如果您阅读ExecutorService.awaitTermination 的javadoc(或查看方法签名),您会看到它返回boolean。此布尔值指示 Executor 是否终止。您可以使用该信息创建while 循环以确定它是否已终止。

        ExecutorService executor = ...
        
        executor.shutdown(); // close the executor and don't accept new tasks
        
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS) {}
        

        这样的事情会停止执行器并等待它终止并且所有任务都完成。

        【讨论】:

          【解决方案4】:

          execute 方法不返回任何内容。您可以使用返回 Future 类型结果的 submit 方法。

          Future<String> future = 
            executorService.submit(callableTask/runnableTask);
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2011-03-17
            • 2023-04-04
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2017-04-14
            • 2012-04-13
            相关资源
            最近更新 更多