【问题标题】:How can I check if all tasks have been completed (normally or abruptly)?如何检查所有任务是否已完成(正常或突然)?
【发布时间】:2015-08-31 09:56:07
【问题描述】:

我有以下课程:

public class Service{
    private static final ExecutorService executor = Executors.newFixedThreadPool(4);

    public synchronized void execute(Collection<Runnable> runs){
        for(Runnable r: runs){
            executor.execute(r);
        }
    }

    public boolean isComplete(){
        //should return true iff all tasks applied by the execute(Collection<Runnable> runs) 
        //method're finished their execution (normally or abruptly, 
        //it doesn matter)
    }
}

如何实现isComplete() 方法。我需要检查是否有当前正在进行的任务。如果执行器被清除(所有任务都完成),那么方法应该返回true,否则返回false。

【问题讨论】:

  • 在您的代码示例中,execute 是否同步?
  • @AndyBrown 在我提供的代码中,没有。但我打算按以下方式使用它:一旦执行器开始执行任务,任何其他线程都不可能调用执行方法,直到所有Runnables 完成。
  • 这意味着您可以重新使用Service。我很想不这样做,因为它是如此轻量级 - 只是让它只允许单次提交,然后在完成后被丢弃。否则,您将获得不完整的服务,然后是完整的,然后是不完整的...... - 这可能会导致问题。
  • @AndyBrown 事情是ServiceApplication Scope,所以我不认为我会以不完整的服务告终......
  • 我有强烈的感觉,你正在重塑invokeAll...

标签: java multithreading threadpoolexecutor


【解决方案1】:

鉴于您使用的是ExecutorService,您可以使用submit 而不是execute,并将返回的Future 存储在一个列表中。然后在isComplete() 中遍历所有Futures 并在每个上调用isDone()。 (这种方法还可以让您在需要时通过Futures 取消提交的任务。

例如

class Service{
    private static final ExecutorService executor = Executors.newFixedThreadPool(4);
    private List<Future<?>> futures;
    public void execute(Collection<Runnable> runs){
        futures = runs.stream()
                .map(r -> executor.submit(r))
                .collect(Collectors.toList());
    }

    public boolean isComplete(){
        for (Future<?> future : futures) 
            if (!future.isDone()) return false;

        return true;
    }
}

根据您的用例,您可以通过从futures 列表中删除项目来获得更好的性能,但您(可能)需要同步isComplete 方法:

    public synchronized boolean isComplete(){
        Iterator<Future<?>> itr = futures.iterator();
        while (itr.hasNext()) {
            if (itr.next().isDone()) itr.remove();
            else return false;
        }
        return true;
    }

在编写此代码示例时,它假定您只会对execute 的每个Service 实例进行一次调用,因此它不需要是@987654337 @。如果您在每个 Service 实例上有多个并发调用者 execute,请注意这将替换每次调用 execute 时的 futures 列表。您可以通过使该类仅供一次性使用或附加到期货来处理该问题。这完全取决于您的用例。

【讨论】:

  • 我正在编写相同的代码,但您发布得更早。我想知道isComplete 是否应该同步。顺便说一句,你需要使用Iterator,否则futures.remove会抛出ConcurrentModificationException
  • 好建议,谢谢。顺便说一句,将List&lt;Future&lt;?&gt;&gt; 包装成单个Future&lt;?&gt; 是否正确。这个问题与我提出的问题没有直接关系......
  • @St.Antario - 有趣。从概念上讲,Future 代表 "the result of an asynchronous computation"。你可以争辩说你可以把它分解成更小的异步任务(类似于ForkJoinPool),所以,是的,相关的Futures 可以被包装成一个Future
【解决方案2】:

您可以调用shutdown() 方法要求执行器终止所有线程并关闭池。然后调用isTerminated(),如果所有线程都终止,则返回true。

如果要阻塞执行,可以使用awaitTerminationMethod(),最后shutDownNow()将终止池,不管线程是否完成执行。

【讨论】:

  • executor 是一个类变量,因此可能在许多服务之间共享以提供一个公共线程池。调用 shutdown() 将杀死所有 Service 实例,而不仅仅是调用 isComplete() 的实例。
  • 那么使用包裹在 Executor 周围的 CompletionService 来正确处理 Future 怎么样?
  • 我喜欢CompletionService - 它很容易被忽视,但它需要花点功夫才能在这里工作?
【解决方案3】:

CountDownLatch 解决了我的问题。

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // ...
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}
 
// wait for the latch to be decremented by the two remaining threads
latch.await();

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-10-18
    • 1970-01-01
    • 2018-04-18
    • 2014-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多