【问题标题】:Java executors: wait for task termination. [duplicate]Java 执行器:等待任务终止。 [复制]
【发布时间】:2010-11-22 06:29:18
【问题描述】:

我需要提交一些任务,然后等待它们,直到所有结果都可用。他们每个人都将String 添加到Vector(默认情况下是同步的)。然后我需要为 Vector 中的每个结果启动一个新任务,但只有在所有先前的任务都停止工作时我才需要这样做。

我想使用 Java Executor,特别是我尝试使用 Executors.newFixedThreadPool(100) 以使用固定数量的线程(我有可变数量的任务,可以是 10 或 500)但我是新的 executors 和我不知道如何等待任务终止。 这类似于我的程序需要执行的伪代码:

ExecutorService e = Executors.newFixedThreadPool(100);
while(true){

/*do something*/

for(...){
<start task>
}

<wait for all task termination>

for each String in result{
<start task>
}

<wait for all task termination>
}

我无法执行 e.shutdown,因为我有一段时间(真的),我需要重用 executorService...

你能帮帮我吗?你能给我推荐一本关于 java executors 的指南/书吗?

【问题讨论】:

标签: java multithreading concurrency executorservice


【解决方案1】:

你能给我推荐一本关于 java执行器??

这部分我可以回答:

Java Concurrency in Practice by Brian Goetz(与 Tim Peierls、Joshua Bloch、Joseph Bowbeer、David Holmes 和 Doug Lea)很可能是您的最佳选择。

这不仅仅是关于执行器,而是涵盖了java.util.concurrent 包的一般性,以及基本的并发概念和技术,以及一些高级主题,例如 Java 内存模型。

【讨论】:

  • 这是一本非常棒的书,虽然它不适合初学者。
【解决方案2】:

ExecutorService 为您提供了一种机制,可以同时执行多个任务并返回 Future 对象的集合(表示任务的异步计算)。

Collection<Callable<?>> tasks = new LinkedList<Callable<?>>();
//populate tasks
for (Future<?> f : executorService.invokeAll(tasks)) { //invokeAll() blocks until ALL tasks submitted to executor complete
    f.get(); 
}

如果您有Runnables 而不是Callables,则可以使用以下方法轻松地将Runnable 转换为Callable&lt;Object&gt;

Callable<?> c = Executors.callable(runnable);

【讨论】:

  • 抱歉挖掘了一个旧帖子,但我看不出调用 f.get(); 的意义。 invokeAll() 本身是阻塞的,因此除非您对结果感兴趣(您的代码不感兴趣),否则无需调用 get()。
  • @hooch:IMO 你应该总是打电话给get(),以防你的Callable 抛出异常。否则它就会丢失。
【解决方案3】:
ExecutorService executor = ...
//submit tasks
executor.shutdown(); // previously submitted tasks are executed, 
                     // but no new tasks will be accepted
while(!executor.awaitTermination(1, TimeUnit.SECONDS))
    ;

如果不创建自定义 ExecutorService,就没有简单的方法来做你想做的事。

【讨论】:

    【解决方案4】:

    而不是直接提交Runnables 或Callables 到Executor 并存储相应的Future 返回值我建议使用CompletionService 实现来检索每个Future when它完成了。这种方法将任务的生产与已完成任务的消耗分离,例如,允许在一段时间内在生产者线程上发起新任务。

    Collection<Callable<Result>> workItems = ...
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletionService<Result> compService = new ExecutorCompletionService<Result>(executor);
    
    // Add work items to Executor.
    for (Callable<Result> workItem : workItems) {
      compService.submit(workItem);
    }
    
    // Consume results as they complete (this would typically occur on a different thread).
    for (int i=0; i<workItems.size(); ++i) {
      Future<Result> fut = compService.take(); // Will block until a result is available.
      Result result = fut.get(); // Extract result; this will not block.
    }
    

    【讨论】:

    • 在实践中,这比我上面的例子涉及更多的 LOC。如果您从多个位置提交任务但想要一致地处理任务完成(例如,执行一些其他计算),CompletionService 通常很有用 - 您只想定义一次
    • @oxbow:或者如果您想在第一个任务完成后立即开始处理结果!否则,您可能正在等待最慢的任务,而其他任务已经完成......(Adamski +1)
    • @Tim - OP 很清楚地说他想等到所有任务都完成,所以哪个任务先完成没有区别(除了几纳秒)。
    • 这种方法让我思考如何改进我的代码。也许我可以更快地做类似的事情。这让我想到了另一个问题:我不确定我的任务是否会结束,那么如何添加一个排序超时??
    • @Raffale:为了清楚起见,可能值得将其作为一个单独的问题发布(除非已经存在)。
    【解决方案5】:

    当您提交到执行器服务时,您会得到一个 Future 对象。

    将这些对象存储在一个集合中,然后依次调用每个对象上的get()get() 阻塞,直到底层作业完成,因此结果是在所有底层作业完成后对每个调用 get() 将完成。

    例如

    Collection<Future> futures = ...
    for (Future f : futures) {
       Object result = f.get();
       // maybe do something with the result. This could be a
       // genericised Future<T>
    }
    System.out.println("Tasks completed");
    

    所有这些都完成后,开始您的第二次提交。请注意,这可能不是对线程池的最佳使用,因为它将变为休眠状态,然后您将重新填充它。如果可能的话,试着让它忙着做事。

    【讨论】:

    • e.submit(我想我需要按照你的例子使用这个)和e.execute有什么区别??
    • 不同之处在于,提交后你会得到一个未来,而执行你不会。如果您使用自己的ThreadFactoryUncaughtExceptionHandler,则execute 将导致处理程序接收任何未捕获的异常,而submit 不会 - 您只能通过Future 的@987654331 获得异常@方法
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-08
    • 2011-06-24
    • 2019-10-08
    • 2014-08-17
    相关资源
    最近更新 更多