【问题标题】:When should I use a CompletionService over an ExecutorService?什么时候应该使用 CompletionService 而不是 ExecutorService?
【发布时间】:2011-06-22 04:26:11
【问题描述】:

我刚刚在this blog post 中找到了CompletionService。但是,这并没有真正展示 CompletionService 相对于标准 ExecutorService 的优势。可以用其中任何一个编写相同的代码。那么,CompletionService 什么时候有用?

您能否提供一个简短的代码示例以使其一目了然?例如,此代码示例仅显示不需要 CompletionService 的位置(=相当于 ExecutorService)

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

【问题讨论】:

    标签: java multithreading concurrency completion-service


    【解决方案1】:
    package com.barcap.test.test00;
    
    import java.util.concurrent.*;
    
    /**
     * Created by Sony on 25-04-2019.
     */
    public class ExecutorCompletest00 {
    
        public static void main(String[] args) {
    
            ExecutorService exc= Executors.newFixedThreadPool( 10 );
            ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc );
    
            for (int i=1;i<10;i++){
                Task00 task00= new Task00( i );
                executorCompletionService.submit( task00 );
            }
            for (int i=1;i<20;i++){
                try {
                    Future<Integer> future= (Future <Integer>) executorCompletionService.take();
                    Integer inttest=future.get();
                    System.out.println(" the result of completion service is "+inttest);
    
                   break;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    ================================================ ========

    package com.barcap.test.test00;
    
    import java.util.*;
    import java.util.concurrent.*;
    
    /**
     * Created by Sony on 25-04-2019.
     */
    public class ExecutorServ00 {
    
        public static void main(String[] args) {
            ExecutorService executorService=Executors.newFixedThreadPool( 9 );
            List<Future> futList= new ArrayList <>(  );
            for (int i=1;i<10;i++) {
               Future result= executorService.submit( new Task00( i ) );
               futList.add( result );
            }
    
             for (Future<Integer> futureEach :futList ){
                 try {
                  Integer inm=   futureEach.get();
    
                     System.out.println("the result of future executorservice is "+inm);
                     break;
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 } catch (ExecutionException e) {
                     e.printStackTrace();
                 }
             }
        }
    }
    

    ================================================ =============

    package com.barcap.test.test00;
    
    import java.util.concurrent.*;
    
    /**
     * Created by Sony on 25-04-2019.
     */
    public class Task00 implements Callable<Integer> {
    
        int i;
    
        public Task00(int i) {
            this.i = i;
        }
    
        @Override
        public Integer call() throws Exception {
            System.out.println(" the current thread is "+Thread.currentThread().getName()  +" the result should be "+i);
            int sleepforsec=100000/i;
             Thread.sleep( sleepforsec );
            System.out.println(" the task complted for "+Thread.currentThread().getName()  +" the result should be "+i);
    
    
    
            return i;
        }
    }
    

    ================================================ ========================

    执行器完成服务的日志差异: 当前线程是 pool-1-thread-1 结果应该是 1 当前线程是 pool-1-thread-2 结果应该是 2 当前线程是 pool-1-thread-3 结果应该是 3 当前线程是 pool-1-thread-4 结果应该是 4 当前线程是 pool-1-thread-6 结果应该是 6 当前线程是 pool-1-thread-5 结果应该是 5 当前线程是 pool-1-thread-7 结果应该是 7 当前线程是 pool-1-thread-9 结果应该是 9 当前线程是 pool-1-thread-8 结果应该是 8 为 pool-1-thread-9 完成的任务结果应该是 9 结果是 9 为 pool-1-thread-8 完成的任务结果应该是 8 为 pool-1-thread-7 完成的任务结果应该是 7 为 pool-1-thread-6 完成的任务结果应该是 6 为 pool-1-thread-5 完成的任务结果应该是 5 为 pool-1-thread-4 完成的任务结果应该是 4 为 pool-1-thread-3 完成的任务结果应该是 3

    为 pool-1-thread-2 完成的任务结果应该是 2

    当前线程是 pool-1-thread-1 结果应该是 1 当前线程是 pool-1-thread-3 结果应该是 3 当前线程是 pool-1-thread-2 结果应该是 2 当前线程是 pool-1-thread-5 结果应该是 5 当前线程是 pool-1-thread-4 结果应该是 4 当前线程是 pool-1-thread-6 结果应该是 6 当前线程是 pool-1-thread-7 结果应该是 7 当前线程是 pool-1-thread-8 结果应该是 8 当前线程是 pool-1-thread-9 结果应该是 9 为 pool-1-thread-9 完成的任务结果应该是 9 为 pool-1-thread-8 完成的任务结果应该是 8 为 pool-1-thread-7 完成的任务结果应该是 7 为 pool-1-thread-6 完成的任务结果应该是 6 为 pool-1-thread-5 完成的任务结果应该是 5 为 pool-1-thread-4 完成的任务结果应该是 4 为 pool-1-thread-3 完成的任务结果应该是 3 为 pool-1-thread-2 完成的任务结果应该是 2 为 pool-1-thread-1 完成的任务结果应该是 1 未来的结果是1

    ================================================ ========

    对于 executorservice,只有在所有任务完成后才能获得结果。

    executor completionservice 任何可用的结果都会返回。

    【讨论】:

      【解决方案2】:

      使用completionservice还有另一个好处:性能

      当您拨打future.get() 时,您正在等待:

      来自java.util.concurrent.CompletableFuture

        private Object waitingGet(boolean interruptible) {
              Signaller q = null;
              boolean queued = false;
              int spins = -1;
              Object r;
              while ((r = result) == null) {
                  if (spins < 0)
                      spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                          1 << 8 : 0; // Use brief spin-wait on multiprocessors
                  else if (spins > 0) {
                      if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                          --spins;
                  }
      

      当您有一个长时间运行的任务时,这将对性能造成灾难。

      使用completionservice,一旦任务完成,它的结果将被排队,您可以手动轮询性能较低的队列。

      completionservice 通过使用带有 done 钩子的 wrap 任务来实现这一点。

      java.util.concurrent.ExecutorCompletionService

          private class QueueingFuture extends FutureTask<Void> {
          QueueingFuture(RunnableFuture<V> task) {
              super(task, null);
              this.task = task;
          }
          protected void done() { completionQueue.add(task); }
          private final Future<V> task;
      }
      

      【讨论】:

      • 您只发布了实际代码的片段,但即使在那里,注释“brief spin-wait”表明此方法不会一直旋转等待。此外,在不知道完成服务使用的特定队列如何实现其poll 方法的情况下,没有理由声称它具有“较低的性能开销”。
      【解决方案3】:

      如果任务生产者对结果不感兴趣,并且由另一个组件负责处理执行器服务执行的异步任务的结果,那么您应该使用CompletionService。它可以帮助您将任务结果处理器与任务生产者分开。见例子http://www.zoftino.com/java-concurrency-executors-framework-tutorial

      【讨论】:

        【解决方案4】:

        使用ExecutorService,一旦您提交了要运行的任务,您需要手动编写代码以高效地完成任务的结果。

        使用CompletionService,这几乎是自动化的。您提供的代码中的差异不是很明显,因为您只提交了一项任务。但是,假设您有一个要提交的任务列表。在下面的示例中,将多个任务提交到 CompletionService。然后,它不再试图找出哪个任务已完成(以获取结果),而只是要求 CompletionService 实例在结果可用时返回它们。

        public class CompletionServiceTest {
        
                class CalcResult {
                     long result ;
        
                     CalcResult(long l) {
                         result = l;
                     }
                }
        
                class CallableTask implements Callable<CalcResult> {
                    String taskName ;
                    long  input1 ;
                    int input2 ;
        
                    CallableTask(String name , long v1 , int v2 ) {
                        taskName = name;
                        input1 = v1;
                        input2 = v2 ;
                    }
        
                    public CalcResult call() throws Exception {
                        System.out.println(" Task " + taskName + " Started -----");
                        for(int i=0;i<input2 ;i++) {
                            try {
                                Thread.sleep(200);
                            } catch (InterruptedException e) {
                                System.out.println(" Task " + taskName + " Interrupted !! ");
                                e.printStackTrace();
                            }
                            input1 += i;
                        }
                        System.out.println(" Task " + taskName + " Completed @@@@@@");
                        return new CalcResult(input1) ;
                    }
        
                }
        
                public void test(){
                    ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
                    CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);
        
                    int submittedTasks = 5;
                    for (int i=0;i< submittedTasks;i++) {
                        taskCompletionService.submit(new CallableTask (
                                String.valueOf(i), 
                                    (i * 10), 
                                    ((i * 10) + 10  )
                                ));
                       System.out.println("Task " + String.valueOf(i) + "subitted");
                    }
                    for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
                        try {
                            System.out.println("trying to take from Completion service");
                            Future<CalcResult> result = taskCompletionService.take();
                            System.out.println("result for a task availble in queue.Trying to get()");
                            // above call blocks till atleast one task is completed and results availble for it
                            // but we dont have to worry which one
        
                            // process the result here by doing result.get()
                            CalcResult l = result.get();
                            System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));
        
                        } catch (InterruptedException e) {
                            // Something went wrong with a task submitted
                            System.out.println("Error Interrupted exception");
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            // Something went wrong with the result
                            e.printStackTrace();
                            System.out.println("Error get() threw exception");
                        }
                    }
                }
            }
        

        【讨论】:

        • 另一个例子参见 Java 并发实践 pg。 130. CompletionService 用于在图像可用时对其进行渲染。
        • 安全地假设 CompletionService 上的 takepoll 是线程安全的?在您的示例中,当您第一次调用 take() 时任务仍在执行,我没有看到任何显式同步。
        • take() 确实是线程安全的。您可以阅读 JavaDocs,但基本上take() 将等待下一个完成的结果,然后返回。 CompletionServiceBlockingQueue 一起用于输出。
        • 有没有更好的方法来知道 ExecutorCompletionService 何时完成所有任务,而不是跟踪提交的任务数量?
        • @DebD,当没有更多结果时调用take() 将导致该线程无限期等待。没有例外。你必须设计你的逻辑来捕捉这种情况并退出等待。这应该不难——您通常有办法知道您的所有任务都已完成,而没有 CompletionService 告诉您这一事实。
        【解决方案5】:

        假设您有 5 个长时间运行的任务(可调用任务),并且您已将这些任务提交给执行器服务。现在想象一下,您不想等待所有 5 个任务都竞争,而是希望在任何一个任务完成时对这些任务进行某种处理。现在,这可以通过在未来对象上编写轮询逻辑或使用此 API 来完成。

        【讨论】:

          【解决方案6】:

          省略很多细节:

          • ExecutorService = 传入队列 + 工作线程
          • CompletionService = 传入队列 + 工作线程 + 输出队列

          【讨论】:

            【解决方案7】:

            在运行时自行查看,尝试实现这两种解决方案(Executorservice 和 Completionservice),您会看到它们的行为有多么不同,并且何时使用其中一种会更清楚。 这里有一个例子如果你想要http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html

            【讨论】:

              【解决方案8】:

              首先,如果我们不想浪费处理器时间,我们就不会使用

              while (!future.isDone()) {
                      // Do some work...
              }
              

              我们必须使用

              service.shutdown();
              service.awaitTermination(14, TimeUnit.DAYS);
              

              这段代码的坏处是它会关闭ExecutorService。如果我们想继续使用它(即我们有一些递归任务创建),我们有两种选择:invokeAll 或ExecutorService

              invokeAll 将等到所有任务完成。 ExecutorService 使我们能够一一获取或投票结果。

              最后是递归示例:

              ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
              ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
              
              while (Tasks.size() > 0) {
                  for (final Task task : Tasks) {
                      completionService.submit(new Callable<String>() {   
                          @Override
                          public String call() throws Exception {
                              return DoTask(task);
                          }
                      });
                  } 
              
                  try {                   
                      int taskNum = Tasks.size();
                      Tasks.clear();
                      for (int i = 0; i < taskNum; ++i) {
                          Result result = completionService.take().get();
                          if (result != null)
                              Tasks.add(result.toTask());
                      }           
                  } catch (InterruptedException e) {
                  //  error :(
                  } catch (ExecutionException e) {
                  //  error :(
                  }
              }
              

              【讨论】:

                【解决方案9】:

                如果您想并行执行多个任务,然后按完成顺序处理它们,则基本上可以使用CompletionService。所以,如果我执行 5 个作业,CompletionService 会给我第一个完成的作业。除了提交Callable 的能力之外,只有一个任务的示例没有比Executor 赋予额外的价值。

                【讨论】:

                  【解决方案10】:

                  我认为 javadoc 最好地回答了 CompletionService 何时有用而 ExecutorService 没有用的问题。

                  一种将新异步任务的产生与已完成任务的结果的消费分离的服务。

                  基本上,此接口允许程序拥有创建和提交任务(甚至检查这些提交的结果)的生产者,而无需了解这些任务结果的任何其他消费者。同时,知道CompletionService 的消费者可以polltake 结果而不知道提交任务的生产者。

                  为了记录,我可能是错的,因为它已经很晚了,但我相当肯定该博客文章中的示例代码会导致内存泄漏。如果没有活跃的消费者从ExecutorCompletionService 的内部队列中取出结果,我不确定博主是如何预期该队列会耗尽的。

                  【讨论】:

                    猜你喜欢
                    • 1970-01-01
                    • 2019-06-15
                    • 2015-12-11
                    • 2012-02-12
                    • 2012-06-27
                    • 2010-11-29
                    • 2015-12-23
                    • 1970-01-01
                    相关资源
                    最近更新 更多