【问题标题】:ExecutorService, how to wait for all tasks to finishExecutorService,如何等待所有任务完成
【发布时间】:2011-03-17 05:09:37
【问题描述】:

等待ExecutorService 的所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的作业——每个核心上一个。现在我的设置如下所示:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask 实现可运行。这似乎可以正确执行任务,但代码在wait()IllegalMonitorStateException 上崩溃。这很奇怪,因为我玩了一些玩具示例,它似乎可以工作。

uniquePhrases 包含数万个元素。我应该使用其他方法吗?我正在寻找尽可能简单的东西

【问题讨论】:

    标签: java multithreading threadpool executorservice


    【解决方案1】:

    如果您想等待所有任务完成,请使用shutdown 方法而不是wait。然后用awaitTermination 关注它。

    此外,您可以使用Runtime.availableProcessors 获取硬件线程数,以便正确初始化线程池。

    【讨论】:

    • shutdown() 停止 ExecutorService 接受新任务并关闭空闲的工作线程。未指定等待关闭完成,ThreadPoolExecutor中的实现不等待。
    • @Alain - 谢谢。我应该提到 awaitTermination。固定。
    • 如果为了完成任务必须安排更多任务怎么办?例如,您可以进行多线程树遍历,将分支移交给工作线程。在这种情况下,由于 ExecutorService 立即关闭,它无法接受任何递归调度的作业。
    • awaitTermination 需要超时时间作为参数。虽然可以提供有限的时间并围绕它放置一个循环以等待所有线程完成,但我想知道是否有更优雅的解决方案。
    • 你是对的,但是看到这个答案 - stackoverflow.com/a/1250655/263895 - 你总是可以给它一个非常长的超时时间
    【解决方案2】:

    如果您想等待执行器服务完成执行,请调用shutdown(),然后调用awaitTermination(units, unitType),例如awaitTermination(1, MINUTE)。 ExecutorService 不会在它自己的监视器上阻塞,所以你不能使用wait 等。

    【讨论】:

    • 我认为是 awaitTermination。
    • @SB - 谢谢 - 我知道我的记忆是错误的!我已经更新了名称并添加了一个链接以确保。
    • 要“永远”等待,就像 awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); stackoverflow.com/a/1250655/32453
    • 我认为这是最简单的方法
    • @MosheElisha,你确定吗? docs.oracle.com/javase/8/docs/api/java/util/concurrent/… 表示启动有序关闭,其中执行先前提交的任务,但不会接受新任务。
    【解决方案3】:

    如果等待ExecutorService 中的所有任务完成并不是您的目标,而是等待特定批次的任务完成,您可以使用CompletionService — 特别是,ExecutorCompletionService

    我们的想法是创建一个ExecutorCompletionService,通过CompletionService 包装您的Executorsubmit 一些已知数量的任务,然后绘制该相同数量 > 完成队列的结果,使用take()(阻止)或poll()(不阻止)。一旦您绘制了与您提交的任务相对应的所有预期结果,您就知道它们都已完成。

    让我再说一遍,因为从界面上看并不明显:您必须知道您在CompletionService 中放入了多少东西,才能知道要尝试提取多少东西。这对于take() 方法尤其重要:多次调用它,它会阻塞你的调用线程,直到其他线程向同一个CompletionService 提交另一个作业。

    书中有some examples showing how to use CompletionServiceJava Concurrency in Practice

    【讨论】:

    • 这很好地反驳了我的回答——我想说这个问题的直接答案是 invokeAll();但是@seh 在向 ES 提交作业组并等待它们完成时是正确的......--JA
    • @om-nom-nom,感谢您更新链接。我很高兴看到这个答案仍然有用。
    • 好答案,我不知道CompletionService
    • 如果你不想关闭现有的 ExecutorService,而只想提交一批任务,并且知道它们什么时候都完成,则可以使用这种方法。
    【解决方案4】:

    您可以在某个时间间隔内等待作业完成:

    int maxSecondsPerComputeDTask = 20;
    try {
        while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
            // consider giving up with a 'break' statement under certain conditions
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);    
    }
    

    或者您可以使用 ExecutorService.submit(Runnable) 并收集它返回的 Future 对象并依次调用 get() 以等待它们完成。

    ExecutorService es = Executors.newFixedThreadPool(2);
    Collection<Future<?>> futures = new LinkedList<<Future<?>>();
    for (DataTable singleTable : uniquePhrases) {
        futures.add(es.submit(new ComputeDTask(singleTable)));
    }
    for (Future<?> future : futures) {
       try {
           future.get();
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       } catch (ExecutionException e) {
           throw new RuntimeException(e);
       }
    }
    

    InterruptedException 对正确处理非常重要。它可以让您或您的图书馆的用户安全地终止一个漫长的过程。

    【讨论】:

      【解决方案5】:

      最简单的方法是使用ExecutorService.invokeAll(),它可以满足您的需求。用您的话说,您需要修改或包装ComputeDTask 以实现Callable&lt;&gt;,这可以为您提供更多的灵活性。可能在您的应用程序中有一个有意义的Callable.call() 实现,但如果不使用Executors.callable(),这里有一种包装它的方法。

      ExecutorService es = Executors.newFixedThreadPool(2);
      List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());
      
      for (DataTable singleTable: uniquePhrases) { 
          todo.add(Executors.callable(new ComputeDTask(singleTable))); 
      }
      
      List<Future<Object>> answers = es.invokeAll(todo);
      

      正如其他人所指出的,如果合适,您可以使用invokeAll() 的超时版本。在此示例中,answers 将包含一堆 Futures,它们将返回空值(请参阅 Executors.callable() 的定义。您可能想要做的是轻微的重构,以便您可以得到有用的答案,或者对底层ComputeDTask 的引用,但我无法从您的示例中看出。

      如果不清楚,请注意invokeAll() 在所有任务完成之前不会返回。 (即,如果被询问,answers 集合中的所有Futures 都将报告.isDone()。)这避免了所有手动关闭、awaitTermination 等...并允许您巧妙地重复使用此ExecutorService 多个周期,如果需要的话。

      关于 SO 有几个相关的问题:

      这些都不是你的问题的严格点,但它们确实提供了一些关于人们认为应该如何使用 Executor/ExecutorService 的颜色。

      【讨论】:

      • 如果您要批量添加所有作业并挂在 Callables 列表中,这是完美的,但如果您在调用 ExecutorService.submit()回调或事件循环情况。
      • 我认为值得一提的是,当不再需要 ExecutorService 时,仍然应该调用 shutdown(),否则线程将永远不会终止(除了 corePoolSize=0 或 allowCoreThreadTimeOut=true 的情况)。
      • 太棒了!正是我想要的。非常感谢分享答案。让我试试这个。
      • @Desty 在这种情况下,最好的实现方式是什么?
      【解决方案6】:

      我也有一组文档要爬取的情况。我从应处理的初始“种子”文档开始,该文档包含指向其他也应处理的文档的链接,依此类推。

      在我的主程序中,我只想写如下内容,其中Crawler 控制着一堆线程。

      Crawler c = new Crawler();
      c.schedule(seedDocument); 
      c.waitUntilCompletion()
      

      如果我想导航一棵树,也会发生同样的情况;我会弹出根节点,每个节点的处理器会根据需要将子节点添加到队列中,并且一堆线程会处理树中的所有节点,直到没有更多节点为止。

      我在 JVM 中找不到任何我认为有点令人惊讶的东西。所以我写了一个类 ThreadPool 可以直接使用或子类添加适合域的方法,例如schedule(Document)。希望对您有所帮助!

      ThreadPool Javadoc | Maven

      【讨论】:

      • 文档链接已失效
      【解决方案7】:

      一个简单的替代方法是使用线程和连接。 参考:Joining Threads

      【讨论】:

      • ExecutorServices 让事情变得更简单
      【解决方案8】:

      我将等待执行程序以您认为适合完成任务的指定超时终止。

       try {  
               //do stuff here 
               exe.execute(thread);
          } finally {
              exe.shutdown();
          }
          boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
          if (!result)
      
          {
              LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
          }
      

      【讨论】:

        【解决方案9】:

        添加集合中的所有线程并使用invokeAll 提交。 如果你可以使用ExecutorServiceinvokeAll 方法,JVM 将在所有线程完成之前不会进行下一行。

        这里有一个很好的例子: invokeAll via ExecutorService

        【讨论】:

          【解决方案10】:

          随便用

          latch = new CountDownLatch(noThreads)
          

          在每个线程中

          latch.countDown();
          

          作为障碍

          latch.await();
          

          【讨论】:

          • 不要忘记在等待时捕获 InterruptedException。
          【解决方案11】:

          听起来你需要ForkJoinPool 并使用全局池来执行任务。

          public static void main(String[] args) {
              // the default `commonPool` should be sufficient for many cases.
              ForkJoinPool pool = ForkJoinPool.commonPool(); 
              // The root of your task that may spawn other tasks. 
              // Make sure it submits the additional tasks to the same executor that it is in.
              Runnable rootTask = new YourTask(pool); 
              pool.execute(rootTask);
              pool.awaitQuiescence(...);
              // that's it.
          }
          

          美妙之处在于pool.awaitQuiescence,该方法将阻止利用调用者的线程来执行其任务,然后当它真的为空时返回。

          【讨论】:

            【解决方案12】:

            IllegalMonitorStateException 的根本原因:

            抛出以表明一个线程试图在一个对象的监视器上等待,或者通知其他线程在一个对象的监视器上等待而不拥有指定的监视器。

            从您的代码中,您刚刚在 ExecutorService 上调用了 wait() 而不拥有锁。

            以下代码将修复IllegalMonitorStateException

            try 
            {
                synchronized(es){
                    es.wait(); // Add some condition before you call wait()
                }
            } 
            

            按照以下方法之一等待已提交给ExecutorService的所有任务完成。

            1. ExecutorService 上从submit 遍历所有Future 任务,并在Future 对象上使用阻塞调用get() 检查状态

            2. ExecutorService 上使用invokeAll

            3. 使用CountDownLatch

            4. 使用ForkJoinPoolnewWorkStealingPoolExecutors(Java 8 起)

            5. 按照 oracle 文档 page 中的建议关闭池

              void shutdownAndAwaitTermination(ExecutorService pool) {
                 pool.shutdown(); // Disable new tasks from being submitted
                 try {
                 // Wait a while for existing tasks to terminate
                 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                     pool.shutdownNow(); // Cancel currently executing tasks
                     // Wait a while for tasks to respond to being cancelled
                     if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                     System.err.println("Pool did not terminate");
                 }
              } catch (InterruptedException ie) {
                   // (Re-)Cancel if current thread also interrupted
                   pool.shutdownNow();
                   // Preserve interrupt status
                   Thread.currentThread().interrupt();
              }
              

              如果您想在使用选项 5 而不是选项 1 到 4 时优雅地等待所有任务完成,请更改

              if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
              

              while(condition) 每 1 分钟检查一次。

            【讨论】:

              【解决方案13】:

              将您的任务提交到 Runner,然后等待调用方法 waitTillDone(),如下所示:

              Runner runner = Runner.runner(2);
              
              for (DataTable singleTable : uniquePhrases) {
              
                  runner.run(new ComputeDTask(singleTable));
              }
              
              // blocks until all tasks are finished (or failed)
              runner.waitTillDone();
              
              runner.shutdown();
              

              要使用它,请添加此 gradle/maven 依赖项:'com.github.matejtymes:javafixes:1.0'

              更多详情请看这里:https://github.com/MatejTymes/JavaFixes 或这里:http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

              【讨论】:

                【解决方案14】:

                您可以使用ExecutorService.invokeAll 方法,它会执行所有任务并等待所有线程完成它们的任务。

                这里是完整的javadoc

                您还可以使用此方法的重载版本来指定超时。

                这是带有ExecutorService.invokeAll的示例代码

                public class Test {
                    public static void main(String[] args) throws InterruptedException, ExecutionException {
                        ExecutorService service = Executors.newFixedThreadPool(3);
                        List<Callable<String>> taskList = new ArrayList<>();
                        taskList.add(new Task1());
                        taskList.add(new Task2());
                        List<Future<String>> results = service.invokeAll(taskList);
                        for (Future<String> f : results) {
                            System.out.println(f.get());
                        }
                    }
                
                }
                
                class Task1 implements Callable<String> {
                    @Override
                    public String call() throws Exception {
                        try {
                            Thread.sleep(2000);
                            return "Task 1 done";
                        } catch (Exception e) {
                            e.printStackTrace();
                            return " error in task1";
                        }
                    }
                }
                
                class Task2 implements Callable<String> {
                    @Override
                    public String call() throws Exception {
                        try {
                            Thread.sleep(3000);
                            return "Task 2 done";
                        } catch (Exception e) {
                            e.printStackTrace();
                            return " error in task2";
                        }
                    }
                }
                

                【讨论】:

                  【解决方案15】:

                  这个怎么样?

                  Object lock = new Object();
                  CountDownLatch cdl = new CountDownLatch(threadNum);
                  for (int i = 0; i < threadNum; i++) {
                      executorService.execute(() -> {
                  
                          synchronized (lock) {
                              cdl.countDown();
                              try {
                                  lock.wait();
                              } catch (InterruptedException e) {
                                  Thread.currentThread().interrupt();
                              }
                          }
                      });
                  }
                  cdl.await();
                  synchronized (lock) {
                      lock.notifyAll();
                  }
                  

                  如果你不向 ExecutorService 添加新任务,这可能会等待所有当前任务完成

                  【讨论】:

                    【解决方案16】:

                    有几种方法。

                    您可以先调用ExecutorService.shutdown,然后再调用ExecutorService.awaitTermination,它会返回:

                    如果此执行程序终止,则为 true,如果超时,则为 false 终止前

                    所以:

                    有一个函数叫做 awaitTermination 但是必须有一个超时 其中提供。这并不能保证当它返回所有 任务就完成了。有没有办法做到这一点?

                    您只需循环调用awaitTermination

                    使用 awaitTermination

                    此实现的完整示例:

                    public class WaitForAllToEnd {
                    
                        public static void main(String[] args) throws InterruptedException {
                            final int total_threads = 4;
                            ExecutorService executor = Executors.newFixedThreadPool(total_threads);
                            for(int i = 0; i < total_threads; i++){
                                executor.execute(parallelWork(100 + i * 100));
                            }
                    
                            int count = 0;
                    
                            // This is the relevant part
                            // Chose the delay most appropriate for your use case
                            executor.shutdown();
                            while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
                                System.out.println("Waiting "+ count);
                                count++;
                            }
                        }
                    
                        private static Runnable parallelWork(long sleepMillis) {
                            return () -> {
                                try {
                                    Thread.sleep(sleepMillis);
                                } catch (InterruptedException e) {
                                    // Do Something
                                }
                                System.out.println("I am Thread : " + Thread.currentThread().getId());
                            };
                        }
                    }
                    

                    使用 CountDownLatch

                    另一种选择是创建一个CountDownLatch,其中count 等于并行任务的数量。每个线程调用countDownLatch.countDown();,而main线程调用countDownLatch.await();

                    此实现的完整示例:

                    public class WaitForAllToEnd {
                    
                        public static void main(String[] args) throws InterruptedException {
                            final int total_threads = 4;
                            CountDownLatch countDownLatch = new CountDownLatch(total_threads);
                            ExecutorService executor = Executors.newFixedThreadPool(total_threads);
                            for(int i = 0; i < total_threads; i++){
                                executor.execute(parallelWork(100 + i * 100, countDownLatch));
                            }
                            countDownLatch.await();
                            System.out.println("Exit");
                            executor.shutdown();
                        }
                    
                        private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
                            return () -> {
                                try {
                                    Thread.sleep(sleepMillis);
                                } catch (InterruptedException e) {
                                    // Do Something
                                }
                                System.out.println("I am Thread : " + Thread.currentThread().getId());
                                countDownLatch.countDown();
                            };
                        }
                    }
                    

                    使用循环障碍

                    另一种方法是使用Cyclic Barrier

                    public class WaitForAllToEnd {
                    
                        public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
                            final int total_threads = 4;
                            CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
                            ExecutorService executor = Executors.newFixedThreadPool(total_threads);
                            for(int i = 0; i < total_threads; i++){
                                executor.execute(parallelWork(100 + i * 100, barrier));
                            }
                            barrier.await();
                            System.out.println("Exit");
                            executor.shutdown();
                        }
                    
                        private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
                            return () -> {
                                try {
                                    Thread.sleep(sleepMillis);
                                } catch (InterruptedException e) {
                                    // Do Something
                                }
                                System.out.println("I am Thread : " + Thread.currentThread().getId());
                                try {
                                    barrier.await();
                                } catch (InterruptedException | BrokenBarrierException e) {
                                  // Do something
                                }
                            };
                        }
                    }
                    

                    还有其他方法,但这些方法需要更改您的初始要求,即:

                    提交后如何等待所有任务完成 使用 ExecutorService.execute() 。

                    【讨论】:

                      猜你喜欢
                      • 2021-08-14
                      • 2018-09-22
                      • 2015-08-28
                      • 1970-01-01
                      • 1970-01-01
                      • 2015-02-14
                      • 2017-04-14
                      • 2019-10-24
                      • 1970-01-01
                      相关资源
                      最近更新 更多