【问题标题】:ThreadPoolExecutor with corePoolSize 0 should not execute tasks until task queue is fullcorePoolSize 为 0 的 ThreadPoolExecutor 不应该在任务队列满之前执行任务
【发布时间】:2019-02-14 14:45:44
【问题描述】:

我正在阅读 Java Concurrency In Practice 并被困在 8.3.1 线程创建和拆卸 主题上。以下脚注警告将corePoolSize 保持为零。

开发人员有时会尝试将核心大小设置为零,以便工作线程能够 最终被拆除,因此不会阻止 JVM 退出,但这可能会导致一些 不使用 SynchronousQueue 作为工作队列的线程池中的奇怪行为 (正如 newCachedThreadPool 所做的那样)。 如果池已经达到核心大小,ThreadPoolExecutor 创建 仅当工作队列已满时才创建新线程。所以任务提交到带有工作队列的线程池 具有任何容量且核心大小为零的队列在队列填满之前不会执行,这通常是 不是我们想要的。

为了验证这一点,我编写了这个程序,但它不能按上述方式运行。

    final int corePoolSize = 0;
    ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());

    // If the pool is already at the core size
    if (tp.getPoolSize() == corePoolSize) {
        ExecutorService ex = tp;

        // So tasks submitted to a thread pool with a work queue that has any capacity
        // and a core size of zero will not execute until the queue fills up.
        // So, this should not execute until queue fills up.
        ex.execute(() -> System.out.println("Hello"));
    }

输出你好

那么,程序的行为是否表明ThreadPoolExecutor 在提交任务时至少创建一个线程,而与corePoolSize=0 无关。如果是,那么教科书中的警告是什么。

编辑:根据@S.K. 的建议对 jdk1.5.0_22 中的代码进行了以下更改:

ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1));//Queue size is set to 1.

但是随着这个改变,程序终止而不打印任何输出。

那么我是否误解了书中的这些陈述?

编辑 (@sjlee): 很难在评论中添加代码,所以我将它作为编辑添加在这里...你可以尝试这个修改并针对两个最新的JDK和JDK 1.5?

final int corePoolSize = 0;
ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

// If the pool is already at the core size
if (tp.getPoolSize() == corePoolSize) {
    ExecutorService ex = tp;

    // So tasks submitted to a thread pool with a work queue that has any capacity
    // and a core size of zero will not execute until the queue fills up.
    // So, this should not execute until queue fills up.
    ex.execute(() -> System.out.println("Hello"));
}
tp.shutdown();
if (tp.awaitTermination(1, TimeUnit.SECONDS)) {
    System.out.println("thread pool shut down. exiting.");
} else {
    System.out.println("shutdown timed out. exiting.");
}

@sjlee 已将结果发布到 cmets。

【问题讨论】:

  • @sjlee jdk 1.5 的输出是 thread pool shut down. exiting.,jdk 1.8 的输出是 Hello thread pool shut down. exiting.
  • 好的,谢谢。正如您在下面发现的那样,JDK 在这方面似乎将行为从 1.5 略微更改为 1.6。
  • @sjlee 但是任何java版本中的代码都不会推断教科书中提到的文本。我是否错误地解释了文本?根据我在问题中的示例,即使任务队列已满corePoolSize =0,任务也不会执行。我无法理解那一点。
  • 本例中的“教科书”是Java Concurrency in Practice?除非不断更新,否则技术书籍一直都过时了。我只是认为这在出版时是正确的,但现在已经不是这样了。更重要的是javadoc(API doc)。我相信当核心池大小 = 0 时,API 文档非常模糊,所以这并不是合同的真正中断。至于任务队列已满且核心池大小 = 0,您能否发布一个代码示例来说明这一点?
  • @sjlee 我已经在我的问题中做了。看jdk1.5的测试结果。这里,核心池大小=0,任务队列大小=1,提交的任务数量=1。

标签: java multithreading concurrency threadpool java-threads


【解决方案1】:

当核心池大小为零时,ThreadPoolExecutor 在 Java 5 中的这种奇怪行为显然被认为是一个错误,并在 Java 6 中悄悄修复。

确实,由于 Java 7 中的一些代码在 6 和 7 之间进行了修改,该问题再次出现。然后它被报告为错误,被确认为错误并修复。

无论哪种方式,您都不应该使用受此错误影响的 Java 版本。 Java 5 已于 2015 年结束生命周期,Java 6 及更高版本的最新可用版本不受影响。 “Java 并发实践”那部分不再合适。

参考资料:

【讨论】:

    【解决方案2】:

    在 jdk 1.5、1.6、1.7 和 1.8 中运行此程序时,我发现 ThreadPoolExecutor#execute(Runnable) 在 1.5、1.6 和 1.7+ 中的不同实现。这是我发现的:

    JDK 1.5 实现

     //Here poolSize is the number of core threads running.
    
     public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        for (;;) {
            if (runState != RUNNING) {
                reject(command);
                return;
            }
            if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                return;
            if (workQueue.offer(command))
                return;
            Runnable r = addIfUnderMaximumPoolSize(command);
            if (r == command)
                return;
            if (r == null) {
                reject(command);
                return;
            }
            // else retry
        }
    }
    

    此实现在corePoolSize 为0 时不会创建线程,因此提供的任务不会执行。

    JDK 1.6 实现

    //Here poolSize is the number of core threads running.
    
      public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }
    

    JDK 1.6 会创建一个新线程,即使 corePoolSize 为 0。

    JDK 1.7+ 实现(类似于 JDK 1.6,但具有更好的锁和状态检查)

        public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    

    JDK 1.7 也会创建一个新线程,即使 corePoolSize 为 0。

    所以,corePoolSize=0 似乎在 JDK 1.5 和 JDK 1.6+ 的每个版本中都是一个特例。

    但奇怪的是,书中的解释与任何程序结果都不匹配。

    【讨论】:

    • 当您已经承认 Java 5 的结果确实匹配时,我不明白您是如何得出“本书的解释与任何程序结果都不匹配”的结论描述。
    • @Holger 我在 Java 5 中运行了以下代码,预计会打印 Hello 但它没有 final int corePoolSize = 0; ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,new LinkedBlockingQueue&lt;&gt;(1));//Queue size is set to 1. if (tp.getPoolSize() == corePoolSize) { ExecutorService ex = tp; ex.execute(() -&gt; System.out.println("Hello")); }
    • 正如书中所说。那你为什么说“这本书的解释与任何程序结果都不匹配”?
    【解决方案3】:

    似乎这是旧 Java 版本的错误,但它现在在 Java 1.8 中不存在。

    根据来自ThreadPoolExecutor.execute() 的 Java 1.8 文档:

         /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * ....
         */
    

    在第二点,在将工作人员添加到队列后进行重新检查,是否可以启动新线程而不是排队任务,而不是回滚排队并启动新线程。

    这就是正在发生的事情。在第一次检查期间,任务已排队,但在重新检查期间,会启动一个新线程来执行您的任务。

    【讨论】:

    • 我在 jdk1.6.0_45 中运行了这个程序,它给出了相同的输出。
    • 看来问题可能出在 jdk 1.5 中。我只能在 google 上获得这个链接,它暗示这个问题在 1.6 中得到修复:cs.oswego.edu/pipermail/concurrency-interest/2006-December/…
    • 我添加了jdk1.5.0_22的测试结果。请参阅问题中的更新。 IMO,如果它是一个错误,它会在教科书本身中提到。
    • 我添加了一个合理的解释。看看吧。
    猜你喜欢
    • 2019-07-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-08-23
    相关资源
    最近更新 更多