【问题标题】:How to wait for all threads to finish, using ExecutorService?如何使用 ExecutorService 等待所有线程完成?
【发布时间】:2015-02-14 15:54:14
【问题描述】:

我需要一次执行 4 个任务,如下所示:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

全部完成后如何获得通知?现在我想不出比设置一些全局任务计数器并在每个任务结束时减少它更好的方法,然后在无限循环中监视这个计数器变为 0;或获取 Futures 列表并在无限循环中为所有这些监视器 isDone。什么是不涉及无限循环的更好的解决方案?

谢谢。

【问题讨论】:

    标签: java multithreading concurrency parallel-processing executorservice


    【解决方案1】:

    使用 Project LoomAutoCloseable 执行器服务上的 Try-with-Resources 语法

    Project Loom 试图为 Java 的并发能力添加新功能。

    其中一个功能是制作ExecutorService AutoCloseable。这意味着每个ExecutorService 实现都将提供一个close 方法。这意味着我们可以使用try-with-resources 语法自动关闭ExecutorService 对象。

    ExecutorService#close 方法会阻塞,直到所有提交的任务都完成。使用close 代替调用shutdownawaitTermination

    成为 AutoCloseable 有助于 Project Loom 尝试将 “structured concurrency” 引入 Java。

    try (
        ExecutorService executorService = Executors.… ;
    ) {
        // Submit your `Runnable`/`Callable` tasks to the executor service.
        …
    }
    // At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
    // After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.
    

    有关 Project Loom 的更多信息,请搜索 Ron Pressler 和 Project Loom 团队的其他人提供的谈话和采访。随着 Project Loom 的发展,关注最近的事情。

    Project Loom 技术的实验版本是 available now,基于早期访问 Java 18

    【讨论】:

      【解决方案2】:

      游戏有点晚了,但为了完成……

      与其“等待”所有任务完成,不如按照好莱坞的原则来思考,“不要打电话给我,我会打电话给你”——当我完成时。 我认为生成的代码更优雅...

      Guava 提供了一些有趣的工具来实现这一点。

      一个例子:

      将 ExecutorService 包装到 ListeningExecutorService 中:

      ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
      

      提交可调用的集合以供执行 ::

      for (Callable<Integer> callable : callables) {
        ListenableFuture<Integer> lf = service.submit(callable);
        // listenableFutures is a collection
        listenableFutures.add(lf)
      });
      

      现在是必不可少的部分:

      ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
      

      将回调附加到 ListenableFuture,当所有未来完成时,您可以使用它来收到通知:

      Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
          @Override
          public void onSuccess(List<Integer> result) {
              // do something with all the results
          }
      
          @Override
          public void onFailure(Throwable t) {
              // log failure
          }
      });
      

      这还提供了一个优势,即您可以在处理完成后将所有结果收集在一个地方......

      更多信息here

      【讨论】:

      • 非常干净。即使在 Android 上也能完美运行。只需要在onSuccess() 中使用runOnUiThread()
      【解决方案3】:

      ExecutorService 的干净方式

       List<Future<Void>> results = null;
       try {
           List<Callable<Void>> tasks = new ArrayList<>();
           ExecutorService executorService = Executors.newFixedThreadPool(4);
           results = executorService.invokeAll(tasks);
       } catch (InterruptedException ex) {
           ...
       } catch (Exception ex) {
           ...
       }
      

      【讨论】:

        【解决方案4】:

        如果您连续使用更多线程 ExecutionServices 并希望等待 EACH EXECUTIONSERVICE 完成。最好的方法如下;

        ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
        for (<loop>) {
           executer1.execute(new Runnable() {
                    @Override
                    public void run() {
                        ...
                    }
                });
        } 
        executer1.shutdown();
        
        try{
           executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        
           ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
           for (true) {
              executer2.execute(new Runnable() {
                    @Override
                    public void run() {
                         ...
                    }
                });
           } 
           executer2.shutdown();
        } catch (Exception e){
         ...
        }
        

        【讨论】:

          【解决方案5】:

          你可以在这个 Runner 类上调用 waitTillDone()

          Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool
          
          while(...) {
              runner.run(new MyTask()); // here you submit your task
          }
          
          
          runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)
          
          
          runner.shutdown(); // once you done you can shutdown the runner
          

          您可以重用这个类并在调用shutdown()之前尽可能多次地调用waitTillDone(),而且您的代码非常简单。此外,您不必提前知道任务数量

          要使用它,只需将此 gradle/maven compile 'com.github.matejtymes:javafixes:1.3.1' 依赖项添加到您的项目中。

          更多细节可以在这里找到:

          https://github.com/MatejTymes/JavaFixes

          【讨论】:

            【解决方案6】:
            
            ExecutorService WORKER_THREAD_POOL 
              = Executors.newFixedThreadPool(10);
            CountDownLatch latch = new CountDownLatch(2);
            for (int i = 0; i < 2; i++) {
                WORKER_THREAD_POOL.submit(() -> {
                    try {
                        // doSomething();
                        latch.countDown();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
            
            // wait for the latch to be decremented by the two remaining threads
            latch.await();
            

            如果doSomething()抛出一些其他异常,latch.countDown()似乎不会执行,那该怎么办?

            【讨论】:

            • 如果你简单地添加 finally 并放上你的latch.CountDown()
            【解决方案7】:

            我创建了以下工作示例。这个想法是有一种方法来处理具有许多线程(由 numberOfTasks/threshold 以编程方式确定)的任务池(我以队列为例),并等到所有线程完成以继续进行一些其他处理。

            import java.util.PriorityQueue;
            import java.util.Queue;
            import java.util.concurrent.CountDownLatch;
            import java.util.concurrent.ExecutorService;
            import java.util.concurrent.Executors;
            
            /** Testing CountDownLatch and ExecutorService to manage scenario where
             * multiple Threads work together to complete tasks from a single
             * resource provider, so the processing can be faster. */
            public class ThreadCountDown {
            
            private CountDownLatch threadsCountdown = null;
            private static Queue<Integer> tasks = new PriorityQueue<>();
            
            public static void main(String[] args) {
                // Create a queue with "Tasks"
                int numberOfTasks = 2000;
                while(numberOfTasks-- > 0) {
                    tasks.add(numberOfTasks);
                }
            
                // Initiate Processing of Tasks
                ThreadCountDown main = new ThreadCountDown();
                main.process(tasks);
            }
            
            /* Receiving the Tasks to process, and creating multiple Threads
            * to process in parallel. */
            private void process(Queue<Integer> tasks) {
                int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
                threadsCountdown = new CountDownLatch(numberOfThreads);
                ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);
            
                //Initialize each Thread
                while(numberOfThreads-- > 0) {
                    System.out.println("Initializing Thread: "+numberOfThreads);
                    threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
                }
            
                try {
                    //Shutdown the Executor, so it cannot receive more Threads.
                    threadExecutor.shutdown();
                    threadsCountdown.await();
                    System.out.println("ALL THREADS COMPLETED!");
                    //continue With Some Other Process Here
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
            
            /* Determine the number of Threads to create */
            private int getNumberOfThreadsRequired(int size) {
                int threshold = 100;
                int threads = size / threshold;
                if( size > (threads*threshold) ){
                    threads++;
                }
                return threads;
            }
            
            /* Task Provider. All Threads will get their task from here */
            private synchronized static Integer getTask(){
                return tasks.poll();
            }
            
            /* The Threads will get Tasks and process them, while still available.
            * When no more tasks available, the thread will complete and reduce the threadsCountdown */
            private class MyThread implements Runnable {
            
                private String threadName;
            
                protected MyThread(String threadName) {
                    super();
                    this.threadName = threadName;
                }
            
                @Override
                public void run() {
                    Integer task;
                    try{
                        //Check in the Task pool if anything pending to process
                        while( (task = getTask()) != null ){
                            processTask(task);
                        }
                    }catch (Exception ex){
                        ex.printStackTrace();
                    }finally {
                        /*Reduce count when no more tasks to process. Eventually all
                        Threads will end-up here, reducing the count to 0, allowing
                        the flow to continue after threadsCountdown.await(); */
                        threadsCountdown.countDown();
                    }
                }
            
                private void processTask(Integer task){
                    try{
                        System.out.println(this.threadName+" is Working on Task: "+ task);
                    }catch (Exception ex){
                        ex.printStackTrace();
                    }
                }
            }
            }
            

            希望对你有帮助!

            【讨论】:

              【解决方案8】:

              这里有两个选项,只是有点混淆哪个最好。

              选项 1:

              ExecutorService es = Executors.newFixedThreadPool(4);
              List<Runnable> tasks = getTasks();
              CompletableFuture<?>[] futures = tasks.stream()
                                             .map(task -> CompletableFuture.runAsync(task, es))
                                             .toArray(CompletableFuture[]::new);
              CompletableFuture.allOf(futures).join();    
              es.shutdown();
              

              选项 2:

              ExecutorService es = Executors.newFixedThreadPool(4);
              List< Future<?>> futures = new ArrayList<>();
              for(Runnable task : taskList) {
                  futures.add(es.submit(task));
              }
              
              for(Future<?> future : futures) {
                  try {
                      future.get();
                  }catch(Exception e){
                      // do logging and nothing else
                  }
              }
              es.shutdown();
              

              这里放future.get();在 try catch 中是个好主意吗?

              【讨论】:

              • 最好让future.get调用定时future.get(10, TimeUnit.SECONDS);并捕获TimeoutException
              【解决方案9】:

              这是我的解决方案,基于“AdamSkywalker”提示,它有效

              package frss.main;
              
              import java.util.ArrayList;
              import java.util.List;
              import java.util.concurrent.CompletableFuture;
              import java.util.concurrent.ExecutorService;
              import java.util.concurrent.Executors;
              
              public class TestHilos {
              
                  void procesar() {
                      ExecutorService es = Executors.newFixedThreadPool(4);
                      List<Runnable> tasks = getTasks();
                      CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
                      CompletableFuture.allOf(futures).join();
                      es.shutdown();
              
                      System.out.println("FIN DEL PROCESO DE HILOS");
                  }
              
                  private List<Runnable> getTasks() {
                      List<Runnable> tasks = new ArrayList<Runnable>();
              
                      Hilo01 task1 = new Hilo01();
                      tasks.add(task1);
              
                      Hilo02 task2 = new Hilo02();
                      tasks.add(task2);
                      return tasks;
                  }
              
                  private class Hilo01 extends Thread {
              
                      @Override
                      public void run() {
                          System.out.println("HILO 1");
                      }
              
                  }
              
                  private class Hilo02 extends Thread {
              
                      @Override
                      public void run() {
                          try {
                              sleep(2000);
                          }
                          catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("HILO 2");
                      }
              
                  }
              
              
                  public static void main(String[] args) {
                      TestHilos test = new TestHilos();
                      test.procesar();
                  }
              }
              

              【讨论】:

                【解决方案10】:

                所以我在这里发布链接问题的答案,以防有人想要更简单的方法来做到这一点

                ExecutorService executor = Executors.newFixedThreadPool(10);
                CompletableFuture[] futures = new CompletableFuture[10];
                int i = 0;
                while (...) {
                    futures[i++] =  CompletableFuture.runAsync(runner, executor);
                }
                
                CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
                

                【讨论】:

                  【解决方案11】:

                  遵循以下方法之一。

                  1. 遍历所有Future 任务,从ExecutorService 上的submit 返回,并按照Kiran 的建议在Future 对象上使用阻塞调用get() 检查状态
                  2. ExecutorService 上使用invokeAll()
                  3. CountDownLatch
                  4. ForkJoinPoolExecutors.html#newWorkStealingPool
                  5. 正确使用ThreadPoolExecutor的shutdown, awaitTermination, shutdownNow API

                  相关的 SE 问题:

                  How is CountDownLatch used in Java Multithreading?

                  How to properly shutdown java ExecutorService

                  【讨论】:

                    【解决方案12】:

                    您可以使用以下代码:

                    public class MyTask implements Runnable {
                    
                        private CountDownLatch countDownLatch;
                    
                        public MyTask(CountDownLatch countDownLatch {
                             this.countDownLatch = countDownLatch;
                        }
                    
                        @Override
                        public void run() {
                             try {
                                 //Do somethings
                                 //
                                 this.countDownLatch.countDown();//important
                             } catch (InterruptedException ex) {
                                  Thread.currentThread().interrupt();
                             }
                         }
                    }
                    
                    CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
                    ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
                    for (int i = 0; i < NUMBER_OF_TASKS; i++){
                         taskExecutor.execute(new MyTask(countDownLatch));
                    }
                    countDownLatch.await();
                    System.out.println("Finish tasks");
                    

                    【讨论】:

                      【解决方案13】:

                      在 Java8 中,您可以使用 CompletableFuture

                      ExecutorService es = Executors.newFixedThreadPool(4);
                      List<Runnable> tasks = getTasks();
                      CompletableFuture<?>[] futures = tasks.stream()
                                                     .map(task -> CompletableFuture.runAsync(task, es))
                                                     .toArray(CompletableFuture[]::new);
                      CompletableFuture.allOf(futures).join();    
                      es.shutdown();
                      

                      【讨论】:

                      • 这是一个非常优雅的解决方案。
                      • ExecutorService es = Executors.newFixedThreadPool(4); List&lt; Future&lt;?&gt;&gt; futures = new ArrayList&lt;&gt;(); for(Runnable task : taskList) { futures.add(es.submit(task)); } for(Future&lt;?&gt; future : futures) { try { future.get(); }catch(Exception e){ // do logging and nothing else } }
                      • @AdamSkywalker 在 es.shutdown() 之后是否需要 awaitTermination()?
                      • @gaurav 当你调用shutdown时,有些任务可能还没有完成。所以 awaitTermination 会阻塞调用线程,直到一切都完成。这取决于你是否需要在这个线程中等待结果。
                      • @AdamSkywalker 很好的答案。如果我不需要等待结果,不调用 awaitTermination() 是有意义的。
                      【解决方案14】:

                      Java 5 及更高版本中的 CyclicBarrier 类就是为这类事情而设计的。

                      【讨论】:

                      • 酷,永远记不住这个数据结构的名字。但是,仅当您事先知道将排队的任务数量时才适用。
                      • 是的,你认为你可以用当前线程和所有子线程来突破障碍,然后当你通过它时,你会知道子线程已经完成......
                      • 其实答案是错误的。 CyclicBarrier 专为部分而设计。 CountDownLatch 专为等待事件设计
                      【解决方案15】:

                      ExecutorService.invokeAll() 为你做。

                      ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
                      List<Callable<?>> tasks; // your tasks
                      // invokeAll() returns when all tasks are complete
                      List<Future<?>> futures = taskExecutor.invokeAll(tasks);
                      

                      【讨论】:

                      • 如果/当你开始“4”个线程时,困难就来了,一次一个,分段,然后加入/让完成所有 4 个......
                      • 这种方法只有在事先知道任务数量的情况下才有效。
                      • 我认为当futures返回时,任务还没有完成。它们将来可能会完成,您将获得结果的链接。这就是为什么它被称为Future。您有方法Future.get(),它将等待任务完成以获取结果。
                      • @AlikElzin-kilaka 来自 JavaDocs 的引用(链接在答案中):“执行给定的任务,在全部完成时返回一个持有其状态和结果的 Futures 列表。Future.isDone() 为真对于返回列表的每个元素。”
                      • 请注意,executorService.invokeAll 将等待所有线程完成,但您仍需要调用 executorService.shutdown 来清理线程池。
                      【解决方案16】:

                      使用CountDownLatch

                      CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
                      ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
                      while(...) {
                        taskExecutor.execute(new MyTask());
                      }
                      
                      try {
                        latch.await();
                      } catch (InterruptedException E) {
                         // handle
                      }
                      

                      在你的任务中(包含在 try / finally 中)

                      latch.countDown();
                      

                      【讨论】:

                      • 没有 4 个任务。一次完成 4 项“一些任务”。
                      • 对不起,我误解了这个问题。是的,任务数应该是 CountDownLatch 构造函数的参数
                      • 我觉得这个解决方案比其他解决方案更优雅,看起来就是为此目的而设计的,而且简单明了。
                      • 开始前不知道任务数怎么办?
                      • @cletus - 那么你不使用 CountDownLatch :-) 请注意,我并不是说这种方法比你的更好。但是,我发现在现实生活场景中,我确实知道任务的数量,线程池设置确实需要根据部署进行配置,并且池可以 i> 被重复使用。所以我通常有 Spring 注入的线程池并将它们设置为原型并手动关闭它们 only 以等待线程完成似乎不太理想。
                      【解决方案17】:

                      基本上是在ExecutorService 上调用shutdown() 然后awaitTermination()

                      ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
                      while(...) {
                        taskExecutor.execute(new MyTask());
                      }
                      taskExecutor.shutdown();
                      try {
                        taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
                      } catch (InterruptedException e) {
                        ...
                      }
                      

                      【讨论】:

                      • 这正是 shutdown / awaitTermination 的意思
                      • 如果此任务处理是一次性事件,这是一个很好的模式。但是,如果在同一运行时重复执行此操作,则不是最佳的,因为每次执行时都会重复创建和拆除线程。
                      • 我正在寻找Long.MAX_VALUE, TimeUnit.NANOSECONDS 相当于没有超时的任何官方文档。
                      • 我不敢相信你必须使用关机才能加入所有当前线程(使用关机后,你不能再次使用执行程序)。建议改用 Future 的列表...
                      • @SamHarwell 请参阅Timing 部分下的java.util.concurrentdocumentation要“永远”等待,您可以使用Long.MAX_VALUE 的值跨度>
                      【解决方案18】:

                      Java 8 - 我们可以使用流 API 来处理流。请看下面的sn-p

                      final List<Runnable> tasks = ...; //or any other functional interface
                      tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
                      
                      //alternatively to specify parallelism 
                      new ForkJoinPool(15).submit(
                                () -> tasks.stream().parallel().forEach(Runnable::run) 
                          ).get();
                      

                      【讨论】:

                      • 嗨弗拉德,欢迎来到 StackOverflow。您能否编辑您的答案以解释这如何回答问题以及代码的作用?此处不鼓励仅使用代码的答案。谢谢!
                      • 这篇文章讨论了并发。并行!=并发
                      【解决方案19】:

                      您应该使用executorService.shutdown()executorService.awaitTermination 方法。

                      一个例子如下:

                      public class ScheduledThreadPoolExample {
                      
                          public static void main(String[] args) throws InterruptedException {
                              ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
                              executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                                      0, 1, TimeUnit.SECONDS);
                      
                              TimeUnit.SECONDS.sleep(10);
                              executorService.shutdown();
                              executorService.awaitTermination(1, TimeUnit.DAYS);
                          }
                      
                      }
                      

                      【讨论】:

                      • 是在shutdown()/之后需要的awaitTermination()
                      【解决方案20】:

                      只是为了在这里提供更多与使用闩锁/屏障不同的替代方案。 您还可以使用CompletionService 获取部分结果,直到所有结果都完成。

                      来自 Java 并发实践: “如果您有一批计算要提交给 Executor,并且您想在它们变为 可用,您可以保留与每个任务关联的 Future 并通过调用 get 重复轮询完成 超时为零。这是可能的,但乏味。幸运的是,有一个更好的方法:完成服务。”

                      这里是实现

                      public class TaskSubmiter {
                          private final ExecutorService executor;
                          TaskSubmiter(ExecutorService executor) { this.executor = executor; }
                          void doSomethingLarge(AnySourceClass source) {
                              final List<InterestedResult> info = doPartialAsyncProcess(source);
                              CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
                              for (final InterestedResult interestedResultItem : info)
                                  completionService.submit(new Callable<PartialResult>() {
                                      public PartialResult call() {
                                          return InterestedResult.doAnOperationToGetPartialResult();
                                      }
                              });
                      
                          try {
                              for (int t = 0, n = info.size(); t < n; t++) {
                                  Future<PartialResult> f = completionService.take();
                                  PartialResult PartialResult = f.get();
                                  processThisSegment(PartialResult);
                                  }
                              } 
                              catch (InterruptedException e) {
                                  Thread.currentThread().interrupt();
                              } 
                              catch (ExecutionException e) {
                                  throw somethinghrowable(e.getCause());
                              }
                          }
                      }
                      

                      【讨论】:

                        【解决方案21】:

                        这可能会有所帮助

                        Log.i(LOG_TAG, "shutting down executor...");
                        executor.shutdown();
                        while (true) {
                                        try {
                                            Log.i(LOG_TAG, "Waiting for executor to terminate...");
                                            if (executor.isTerminated())
                                                break;
                                            if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                                                break;
                                            }
                                        } catch (InterruptedException ignored) {}
                                    }
                        

                        【讨论】:

                          【解决方案22】:

                          您也可以使用期货列表:

                          List<Future> futures = new ArrayList<Future>();
                          // now add to it:
                          futures.add(executorInstance.submit(new Callable<Void>() {
                            public Void call() throws IOException {
                               // do something
                              return null;
                            }
                          }));
                          

                          然后当你想加入所有这些时,它基本上相当于加入每个,(额外的好处是它将异常从子线程重新引发到主线程):

                          for(Future f: this.futures) { f.get(); }
                          

                          基本上,诀窍是一次在每个 Future 上调用 .get(),而不是在(全部或每个)上无限循环调用 isDone()。因此,一旦最后一个线程完成,您就可以保证“继续”通过并越过这个块。需要注意的是,由于 .get() 调用重新引发异常,如果其中一个线程死亡,您可能会在其他线程完成完成之前从这里引发[为避免这种情况,您可以在周围添加 catch ExecutionException get 调用]。另一个警告是它保留对所有线程的引用,因此如果它们具有线程局部变量,直到您通过此块之后才会收集它们(尽管您可以通过删除来解决这个问题,如果它成为一个问题Future 不在 ArrayList 中)。如果您想知道哪个 Future “先完成”,您可以使用 https://stackoverflow.com/a/31885029/32453

                          【讨论】:

                          【解决方案23】:

                          在执行器getActiveCount() 中有一个方法——它给出了活动线程的计数。

                          跨越线程后,我们可以检查activeCount()的值是否为0。一旦该值为零,则表示当前没有活动线程在运行,这意味着任务已完成:

                          while (true) {
                              if (executor.getActiveCount() == 0) {
                              //ur own piece of code
                              break;
                              }
                          }
                          

                          【讨论】:

                          【解决方案24】:

                          您可以使用您自己的ExecutorCompletionService 子类来包装taskExecutor,并使用您自己的BlockingQueue 实现来在每个任务完成时获得通知,并在完成的任务数量达到时执行您想要的任何回调或其他操作你想要的目标。

                          【讨论】:

                            【解决方案25】:

                            我刚刚编写了一个示例程序来解决您的问题。没有给出简洁的实现,所以我会添加一个。虽然您可以使用executor.shutdown()executor.awaitTermination(),但这并不是最佳做法,因为不同线程所花费的时间是不可预测的。

                            ExecutorService es = Executors.newCachedThreadPool();
                                List<Callable<Integer>> tasks = new ArrayList<>();
                            
                                for (int j = 1; j <= 10; j++) {
                                    tasks.add(new Callable<Integer>() {
                            
                                        @Override
                                        public Integer call() throws Exception {
                                            int sum = 0;
                                            System.out.println("Starting Thread "
                                                    + Thread.currentThread().getId());
                            
                                            for (int i = 0; i < 1000000; i++) {
                                                sum += i;
                                            }
                            
                                            System.out.println("Stopping Thread "
                                                    + Thread.currentThread().getId());
                                            return sum;
                                        }
                            
                                    });
                                }
                            
                                try {
                                    List<Future<Integer>> futures = es.invokeAll(tasks);
                                    int flag = 0;
                            
                                    for (Future<Integer> f : futures) {
                                        Integer res = f.get();
                                        System.out.println("Sum: " + res);
                                        if (!f.isDone()) 
                                            flag = 1;
                                    }
                            
                                    if (flag == 0)
                                        System.out.println("SUCCESS");
                                    else
                                        System.out.println("FAILED");
                            
                                } catch (InterruptedException | ExecutionException e) {
                                    e.printStackTrace();
                                }
                            

                            【讨论】:

                            • 很高兴你展示了future.get的使用——了解的好选择。但是为什么您认为永远等待比设置一些可接受的最大超时时间更好?更重要的是,没有理由做所有这些逻辑,如果你想等待(基本上是永远)直到所有任务完成,你可以简单地给 awaitTermination 一个非常非常长的时间。
                            • 这与这里已经提出的解决方案没有什么不同。您的解决方案与@sjlee 提出的相同
                            • 不知道为什么根据 oracle 文档需要检查完成时,invokeAll 将仅返回“当全部完成或超时到期时,以先发生者为准”
                            【解决方案26】:

                            只有我的两分钱。 要克服CountDownLatch 事先知道任务数量的要求,您可以使用简单的Semaphore 以旧方式进行。

                            ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
                            int numberOfTasks=0;
                            Semaphore s=new Semaphore(0);
                            while(...) {
                                taskExecutor.execute(new MyTask());
                                numberOfTasks++;
                            }
                            
                            try {
                                s.aquire(numberOfTasks);
                            ...
                            

                            在您的任务中,只需拨打s.release() 就像您拨打latch.countDown(); 一样

                            【讨论】:

                            • 看到这个,我首先想知道如果在acquire 调用之前发生一些release 调用会不会有问题,但是在阅读了 Semaphore 文档后,我认为这没关系。
                            【解决方案27】:

                            您可以将您的任务包装在另一个可运行的文件中,这将发送通知:

                            taskExecutor.execute(new Runnable() {
                              public void run() {
                                taskStartedNotification();
                                new MyTask().run();
                                taskFinishedNotification();
                              }
                            });
                            

                            【讨论】:

                            • 花了我一段时间看看这将如何解决 OP 的问题。首先,请注意这个包装是 each 任务,而不是启动所有任务的代码。据推测,每次开始都会增加一个计数器,每次完成都会减少该计数器,或者会增加一个completed 计数器。因此,在启动它们之后,在每次通知时,可以确定 所有 任务是否已完成。请注意,使用try/finally 非常重要,这样即使任务失败也会发出完成通知(或catch 块中的替代通知)。否则,将永远等待。
                            猜你喜欢
                            • 1970-01-01
                            • 2018-09-22
                            • 2011-03-17
                            • 2019-10-24
                            • 1970-01-01
                            • 2021-09-20
                            • 1970-01-01
                            • 2010-09-20
                            • 1970-01-01
                            相关资源
                            最近更新 更多