【问题标题】:Wait for all tasks (unknown number) to finish - ThreadPoolExecutor等待所有任务(未知数)完成 - ThreadPoolExecutor
【发布时间】:2012-04-13 18:08:12
【问题描述】:

我遇到了这个困扰了我好几天的并发问题。

基本上,我希望我的 ThreadPoolExecutor 在关闭之前等待所有任务(任务数未知)完成。

public class AutoShutdownThreadPoolExecutor extends ThreadPoolExecutor{
    private static final Logger logger = Logger.getLogger(AutoShutdownThreadPoolExecutor.class);
    private int executing = 0;
    private ReentrantLock lock = new ReentrantLock();
    private final Condition newTaskCondition = lock.newCondition(); 
    private final int WAIT_FOR_NEW_TASK = 120000;

    public AutoShutdownThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        lock.lock();
        executing++;
        lock.unlock();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        try{
            lock.lock();
            int count = executing--;
            if(count == 0) {
                newTaskCondition.await(WAIT_FOR_NEW_TASK, TimeUnit.MILLISECONDS); 
                if(count == 0){
                    this.shutdown();
                    logger.info("Shutting down Executor...");
                }
            }
        }
        catch (InterruptedException e) {
            logger.error("Sleeping task interrupted", e);
        }
        finally{
            lock.unlock();
        }
    }
}

目的是任务检查任务计数器(正在执行),如果等于0,则阻塞一段时间,稍后释放它的锁,以便其他任务有机会执行而不是关闭执行者太早了。

然而,这并没有发生。 executor中的4个线程全部进入等待状态:

"pool-1-thread-4" prio=6 tid=0x034a1000 nid=0x2d0 waiting on condition [0x039cf000]
"pool-1-thread-3" prio=6 tid=0x034d0400 nid=0x1328 waiting on condition [0x0397f000]
"pool-1-thread-2" prio=6 tid=0x03493400 nid=0x15ec waiting on condition [0x0392f000]
"pool-1-thread-1" prio=6 tid=0x034c3800 nid=0x1fe4 waiting on condition [0x038df000]

如果我在 Runnable 类中添加一个日志语句(应该会降低线程速度),问题似乎就消失了。

public void run() {
    //  logger.info("Starting task" + taskName);
        try {
            //doTask();
        }
        catch (Exception e){
            logger.error("task " + taskName + " failed", e);
        }
}

问题与这篇文章类似 Java ExecutorService: awaitTermination of all recursively created tasks

我已采用原始海报解决方案并尝试在 afterExecute() 中解决竞争条件,但它不起作用。

请帮助阐明这一点。 谢谢。

【问题讨论】:

  • 使用 CountDownLatch 怎么样?
  • 任务数事先未知。该程序扫描一个目录以查找其子文件夹中的所有文件,并对每个文件执行一些数字运算任务。

标签: java concurrency


【解决方案1】:

你有你的任务在等待这个newTaskCondition,但没有任何迹象表明这种情况。所以你的线程都堆积起来了,都在等待newTaskCondition,直到超时。另外,阻塞afterExecute 会延迟任务的完成,所以这可能不是你想做的。

如果您想让它等待一段时间以查看是否有更多工作进入,则此功能已经存在。只需调用setKeepAliveTime设置等待多长时间,设置allowCoreThreadTimeOut确保所有线程(包括核心线程)都可以终止。

【讨论】:

  • 这就是目的。检查是否没有更多任务 -> 等待并释放锁一段时间,(如果有新任务,他们应该已经获得了这个锁) -> 唤醒并重新获取锁 -> 再次检查是否有任务仍然是 0 -> 如果是,则关闭执行程序,如果不退出任务。我猜在这种情况下不需要发出信号让线程唤醒?
  • @TommyQ,这就是正在发生的事情 - 一个任务运行,它看到没有更多的线程,它开始等待 120 秒。重复四次,现在您有四个等待任务。在计时器到期之前,什么都不会唤醒它们,如果你因为它们忙于等待 120 秒而用完线程,那么,太糟糕了。内置的保活定时器有什么问题?
  • setKeepAliveTime 只适用于多余的线程?我设置corePoolsize = maxPoolsize;
【解决方案2】:

我找到了一个非常简单的解决方案来解决我的问题。

虽然事先不知道任务的数量,目的是让主线程等待threadPool中的所有任务完成,但是所有任务提交的时间是已知的(在find-and-submit之后- task 方法已扫描所有文件)。

我可以简单地调用 ThreadPoolExecutor.shutdown() 和 awaitTermination(Long.MAX_VALUE) 以便主线程无限期地等待 ThreadPool 完成其任务。

【讨论】:

    猜你喜欢
    • 2020-05-13
    • 1970-01-01
    • 1970-01-01
    • 2023-03-26
    • 2021-06-12
    • 2011-03-17
    • 2023-04-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多