【问题标题】:Want ThreadPoolExecutor to execute tasks instantly希望 ThreadPoolExecutor 立即执行任务
【发布时间】:2014-07-02 08:45:35
【问题描述】:

我有一个 ThreadPoolExecutor,其中一个线程将用于批处理,因此在将新任务分配给执行程序之前,我必须等待较早的任务完成,我是根据活动作业的值来执行此操作的,但仔细看我发现,执行者不会立即执行任务。

这对我造成的问题是我准备好下一批但第一个任务尚未开始,因此活动作业的值为 0。

我怎样才能立即运行任务。我也可以接受任何其他执行者或可以做到这一点的方式。

【问题讨论】:

  • 当你 submit() 任务到线程池执行器时,你会得到一个 Future。存储该返回值,您可以等待该 Future 以确保您之前的任务已完成。但是,在提交新的批处理作业之前,您必须等待之前的任务完成有什么特别的原因吗?
  • 好吧,如果你有一个带有 1 个线程的池,并且你需要在提交下一个任务之前等待,那么你不需要任何线程池...
  • 我不太明白你想要达到什么目的:如果你想等待前面的任务完成,你为什么要使用执行器?
  • 场景是我有很多数据会被处理然后写入文件,所以这一切都将分批完成。因此,一旦我处理了固定数量的数据,它就会被提供给作为执行程序的编写器线程并继续处理更多数据,但是在我将下一批数据提供给编写器之前,我应该等待较早的工作,因为他们可能会写到同一个文件。
  • @user996808 如果您有一个只有 1 个线程的 ThreadPoolExecutor,您的作业将按照您提交作业的顺序执行,一次一个作业。如果当前作业没有完成,执行程序自然会将作业排队 - 因此在这种情况下,您无需在提交新作业之前等待较早的作业。

标签: java executorservice threadpoolexecutor


【解决方案1】:

您可能应该使用ExecutorService 中的submit 方法来安排您的任务。这是一个使用单线程执行器运行 10 个任务的工作程序。我投到ThreadPoolExecutor 来监控线程池状态。您可以通过在其对应的Future 实例上调用get 来等待单个任务,或者通过调用awaitTermination 来等待所有任务。如果您不需要来自Future 的结果,只需使用Void。希望对您有所帮助。

public class Main {                                                                                                                             
    static class TimingCallable implements Callable<Long> {                                                                                     
        static int MIN_WAIT = 200;                                                                                                              
        @Override                                                                                                                               
        public Long call() {                                                                                                                    
            long start = System.currentTimeMillis();                                                                                            
            try {                                                                                                                               
                Thread.sleep(MIN_WAIT + new Random().nextInt(300));                                                                             
            } catch (InterruptedException e) {                                                                                                  
                //DO NOTHING                                                                                                                    
            }                                                                                                                                   
            return System.currentTimeMillis() - start;                                                                                          
        }                                                                                                                                       
    }                                                                                                                                           

    public static void main(String[] args) throws InterruptedException, ExecutionException {                                                    

        ExecutorService executor =  Executors.newFixedThreadPool(1);                                                                            
        @SuppressWarnings("unchecked")                                                                                                          
        Future<Long>[] futureResults = new Future[10];                                                                                          
        for(int i =0; i < futureResults.length; i++) {                                                                                          
            futureResults[i] = executor.submit(new TimingCallable());                                                                           
            System.out.println(String.format("ActiveCount after submitting %d tasks: ", i+1) + ((ThreadPoolExecutor)executor).getActiveCount());
            System.out.println(String.format("Queue size after submitting %d tasks: ", i+1) + ((ThreadPoolExecutor)executor).getQueue().size());
        }                                                                                                                                       
        Thread.sleep(2000);                                                                                                                     
        System.out.println("ActiveCount after 2 seconds: " + ((ThreadPoolExecutor)executor).getActiveCount());                                  
        System.out.println("Queue size after 2 seconds: " + ((ThreadPoolExecutor)executor).getQueue().size());                                  
        for(int i =0; i < futureResults.length; i++) {                                                                                          
            if (futureResults[i].isDone()) {                                                                                                    
                System.out.println(String.format("%d task is done with execution time: ", i) + futureResults[i].get());                         
            }                                                                                                                                   
        }                                                                                                               //Waiting for the last task to finish
        System.out.println("Waiting for the last task result: " + futureResults[9].get());
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);                                  
    }                                                                                                                                           
}                                                                                                                                               

【讨论】:

    【解决方案2】:

    如果您只有一个线程要执行,只需使用 LinkedQueue 来存储作业,一旦线程完成执行,那么只有它会选择另一个任务。

    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
    

    如果你限制大小,你也可以有不同的策略

    http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

    读取被拒绝的任务

    【讨论】:

    • 但是我怎么能等待第一个任务完成
    【解决方案3】:

    单线程池执行器服务

    显然您想立即运行多个任务,但要按照提交的顺序。

    很简单:使用由单个线程支持的 executor service。执行器在等待较早的任务完成时缓冲任务。在线程池中的单个线程上,一次只能执行一个任务,因此它们将按照提交的顺序依次完成。

    Executors 类提供了支持执行程序服务的几个不同线程池的选择。你想要Executors.newSingleThreadExecutor()

    ExecutorService es = Executors.newSingleThreadExecutor() ;
    

    提交一系列RunnableCallable 对象。每个代表一个要执行的任务。

    es.submit(  ( ) -> System.out.println( "Hello. " + Instant.now() )  ) ;
    es.submit(  ( ) -> System.out.println( "Bonjour. " + Instant.now() )  ) ;
    es.submit(  ( ) -> System.out.println( "Aloha. " + Instant.now() )  ) ;
    es.submit(  ( ) -> System.out.println( "Ciào. " + Instant.now() )  ) ;
    es.submit(  ( ) -> System.out.println( "Shwmai. " + Instant.now() )  ) ;
    

    如果您想跟踪任务的完成情况,可以选择捕获每次调用submit 返回的Future 对象。 (上面的代码中没有显示)

    看到这个code run live at IdeOne.com

    你好。 2019-11-29T09:10:13.426987Z

    你好。 2019-11-29T09:10:13.472719Z

    阿罗哈。 2019-11-29T09:10:13.473177Z

    Ciào。 2019-11-29T09:10:13.473479Z

    Shwmai。 2019-11-29T09:10:13.473974Z

    【讨论】: