【问题标题】:JAVA Thread pool reusing threadsJAVA线程池复用线程
【发布时间】:2017-11-07 11:58:52
【问题描述】:

每个人。 我对使用线程池有误解。实际结果与该类的 API 描述不同。当我在线程池中使用LinkedBlockedQueue 时,它不重用线程,线程池等待构造函数中设置的 KeepAliveTime,然后终止该线程并创建一个新线程。当我将 KeepAliveTime 设置为较小时,例如 1 秒或更短,它会删除线程并重新创建它,但如果我设置一分钟,则不会创建新线程,因为 MaxPoolSize 不允许它并且队列已经满,因此所有任务都被拒绝,但是keepAliveTime 设置为分钟的线程这次什么都不做。我很新,不明白为什么它不重用这些线程。在keepTimeAlive 到期后,它会杀死这些线程,如果队列已满,它会创建一个新线程。为什么它会这样工作?据我从 API 了解,如果线程在 keepAliveTime 期间空闲,则必须重用它。当我使用SynchronousQueue 时,它会重用线程,而不是LinkedBlockingQueue

public class Main {

    private volatile int remainingTasksCount;
    private volatile static ThreadPoolExecutor consumer = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3));

    private static Runnable task = () -> {
        System.out.println(String.format("consumer %s, id %s, size %s, active count %s, queue %s",
                Thread.currentThread().getName(), Thread.currentThread().getId(),
                consumer.getPoolSize(), consumer.getActiveCount(), 3-consumer.getQueue().remainingCapacity()));
        String s = new String();
        synchronized (s) {
            try {
                s.wait(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };


    public static void main(String[] args) throws IOException {
        try {
            new Thread(() -> {
                while (true) {
                    try {
                        for (int i = 0; i < 5; i++) {
                            consumer.submit(task);
                        }
                        System.out.println("PUSH TASKS");
                        synchronized (Thread.currentThread()) {
                            Thread.currentThread().wait(10000);
                        }
                    } catch (Throwable th) {
                        System.out.println(th);
                    }
                }
            }).start();
        } catch (Throwable th) {
            System.out.println(th);
        }
    }  

输出

PUSH TASKS
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 0
Disconnected from the target VM, address: '127.0.0.1:64434', transport: 'socket'

Process finished with exit code 1

但是下次生产者提交任务时,我得到RejectedExecutionException

如果我将keepAliveTime 更改为1 Second。一切运行良好,但创建 新线程。

PUSH TASKS
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
consumer pool-1-thread-3, id 17, size 1, active count 1, queue 2
PUSH TASKS
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 2
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0

如果有人能解释我的错误,或者我错过了什么基本原则,我会很高兴

【问题讨论】:

  • TL;DR - Executors.newCachedThreadPool() 怎么样?
  • 这就是我写的问题,SynchronousQueue 重用线程,但 LinkedBlockingQueue 不是(API 说任何blockingQueue 接口实现者重用),它创建的线程与您的任务一样多。 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
  • 您不应该管理线程池使用的队列。这就是问题所在。如果您使用@rkosegi 建议的内容,则队列由ExecutorService 管理。如果您尝试限制活动线程,请使用Executors.newFixedThreadPool(15)。这将只允许一次执行 15 个线程,但允许您将任意数量的作业排队。
  • 你在哪里看到我管理队列?每当我的程序执行时,我都不需要 15 个线程。它将每天加载 2-3 次。但这将是巨大的负担。所以我想动态增加线程数。我想,我可以制作更多的 corePoolSize 值和 allowCoreThreadTimeOut(true)。所以当线程空闲时它会减少线程数。当提交新任务并且线程池大小

标签: java multithreading threadpool


【解决方案1】:

这是一个竞争条件。如果您关注submit() 足够长的时间(在源代码中),您将到达ThreadPoolExecutor.execucte()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /* long comment block removed */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

当您的submit-loop 第一次运行时,execute 将创建新的工作人员,并为他们提供您的任务,而无需尝试将它们推入队列 (addWorker+return),因此 2 个任务立即开始,3个进入可以容纳所有3个的队列。

第二次,submit-s 会以 workQueue.offer 结束,这可能会使队列饱和(取决于工作人员尝试消费新项目的速度),当它饱和时, last-effort addWorker 将运行并失败,导致reject,因为不允许创建新的工作人员。

实际上,如果您在提交循环中开始做“事情”,它最终会开始工作。例如,我尝试println(i),这足够慢以至于消耗了一些任务并且循环成功。当我尝试print(i) 这已经太快了,它在第 4 次提交时就死了,所以很快就没有任务被消耗掉。所以这是一个微妙的问题,通常是什么竞争条件。

【讨论】:

  • 谢谢,关于线程池的工作原理,你是对的。我同意。但是您能否尝试在构造函数 TimeUnit.SECONDS 中进行设置,这样它才能正确且快速地工作。它会杀死已经执行任务的线程并重新创建新线程,但我不想创建新线程并重用它们,所以我将此参数设置为分钟
  • @qmalt 是的,这很正常:如果减少超时,线程将在 10 秒等待期间终止,并且模式将使用 create-2-threads-and-store-3 从头开始​​重新开始-tasks-in-3-slots,不会失败。
  • 我还是一头雾水。当线程正在等待新任务时,为什么线程池在保持活动期间不使用线程
  • @qmalt 线程池以您期望的方式使用线程,但它需要时间,因为它是异步发生的。简单地说,填满队列的速度可能比实际线程从中拉出任务的速度要快。请注意,这些工作线程是实际的操作系统线程,在您提交任务时它们完全处于空闲状态,因此它们需要唤醒并被调度。而提交者循环是一个非常快速和紧凑的循环,它位于一个正在运行的线程中。
  • 谢谢。我在第二次迭代中看到了这一点。所有任务都尝试进入队列,大小为 3,因此另外 2 个被拒绝。我认为在第二次迭代之前也将是第一次。但是源代码和您的建议对我有帮助!
【解决方案2】:

我认为由于您的代码示例,您对线程池的工作方式有一些误解。我尝试运行它并从 5 个任务和无限数量的 RejectedExecutionException 中获取输出。发生这种情况是因为在出现异常Thread.currentThread().wait(10000); 的情况下没有调用,另外 5 个任务被添加到池中,并且这个逻辑一次又一次地重复产生新的异常。尝试包围 consumer.submit(task);使用 try-catch 块,您将看到只有两个线程按预期处理所有任务,因为 keepTimeAlive 比等待时间长。在第二个示例中,keepTimeAlive 比等待时间短,因此在每次等待后都会创建新的非核心线程,并且在每次循环调用后您会看到不同的 id。这是正确的,因为之前的非核心线程已停止,因为它的空闲时间超过了 keepTimeAlive

【讨论】:

  • 将代码参数 TimeUnit.SECONDS 更改为 MINUTES 并在构造函数中设置值 1。你会看到你的任务也会被拒绝,尽管线程会空闲
  • 11 12 12 11 12 12 13 13 12 12. 这是你的程序的输出。在这里我们可以看到,线程池创建了新线程,因为您设置了太小的 keepAlive 参数。所以它正在等待一个任务,还没有收到它并且已经被终止,所以新的推送任务线程池创建了具有新 id 的新线程。但我只是将 keepAlive 参数更改为分钟。
  • 所以线程必须等待新任务,而不是它,线程没有收到新任务并等待keepAlive时间到期然后它将被终止并创建新线程,但在此期间什么都没有做线程池拒绝所有任务,因为它无法创建新线程(由于 maxPoolSize 参数)并且队列已满
  • 线程创建与 keepAlive 参数无关。您的主要问题是为什么在 5 个任务、2 个空闲线程和大小为 3 的阻塞队列的情况下会拒绝任务?
  • 查看主题标题。我的问题为什么 threadPool 不重用线程
猜你喜欢
  • 2013-04-19
  • 1970-01-01
  • 2018-10-02
  • 2011-08-10
  • 2011-11-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多