【问题标题】:ExecutorService that scales threads then queues tasksExecutorService 扩展线程然后对任务进行排队
【发布时间】:2015-07-08 17:15:58
【问题描述】:

是否有ExecutorService 实现,其行为类似于具有以下特征的线程池?

  1. 始终至少有 X 个活动线程。
  2. 如果提交任务并且所有活动线程都忙,则会启动一个新线程,最多 Y 个线程。
  3. 如果提交了任务并且所有 Y 线程都忙,则该任务将排队。
  4. 如果没有提交新任务,池将缩减为 X 个活动线程。

相当标准的线程池行为。你会认为ThreadPoolExecutor 会处理这个问题,但是

executorService = new ThreadPoolExecutor(
    2, 10, // min/max threads
    60, TimeUnit.SECONDS, // time of inactivity before scaling back
    new SynchronousQueue<Runnable>()); // task queue

如果提交的任务超过 10 个,将抛出异常。切换到LinkedBlockingQueue 将永远不会超过两个最小线程,除非您像new LinkedBlockingQueue&lt;Runnable&gt;(20) 这样限制大小,在这种情况下,将有两个线程处理 1-20 个任务,2-10 个线程处理 21-30 个任务,超过 30 个任务的例外。不漂亮。同时,固定线程池永远不会缩减非活动线程。

那么,为了得到我想要的东西,我可以使用不同类型的BlockingQueue 或摆弄我错过的其他设置吗?是否有另一个更适合的ExceutorService 实现(哪个?),或者我最好通过覆盖ThreadPoolExecutorexecute() 方法来滚动自己的实现?

【问题讨论】:

  • 我不认为一些不活跃的线程会造成太大的伤害?
  • 我也不明白您对非活动线程的担忧。 Afaik,他们坐在那里休息,几乎没有任何间接费用。

标签: java multithreading executorservice


【解决方案1】:

这应该可以解决问题:

    ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, maxPoolSize, 60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
    tpe.setRejectedExecutionHandler((
            Runnable r,
            ThreadPoolExecutor executor) -> {
        // this will block if the queue is full but not until the sun will rise in the west!
            try {
                if(!executor.getQueue().offer(r, 15, TimeUnit.MINUTES)){
                    throw new RejectedExecutionException("Will not wait infinitely for offer runnable to ThreadPoolExecutor");
                }
            } catch (InterruptedException e) {
                throw new RejectedExecutionException("Unable to put runnable to queue", e);
            }
        });

【讨论】:

  • Runnable 直接提交到执行器的内部队列似乎是不可取的。根据文档:“方法 getQueue() 允许访问工作队列以进行监视和调试。强烈建议不要将此方法用于任何其他目的。”
【解决方案2】:

实际上SynchronousQueue 的大小基本上为零,因此,当您提交超过 10 个任务(最大池大小)时,拒绝政策就会启动,任务会被拒绝。此外,对于每个提交的任务,它都会获得一个线程并使用它。

当您使用大小为 20 的 LinkedBlockingQueue 时,如果已达到核心大小并且没有空闲线程,它将排队任务。他们将继续排队,除非队列已满,在这种情况下,它将创建一个新线程用于新任务。当池大小达到最大值时,任何新提交都将采用拒绝政策。

您希望每当提交任务并且核心(核心大小)中的所有线程都忙时,您的任务应该获得一个新线程,而不是在队列中等待,我认为这是不可能的。

【讨论】:

  • @Brett 你是对的,我只是犯了一个错误。编辑了答案。
【解决方案3】:

很遗憾,答案是否定的。关于 jre 中的内容,您能做的最好的事情就是有效地没有线程地板,只有天花板。这可以通过allowing core threads to timeout 来完成。

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 60, TimeUnit.Seconds, new LinkedBlockingQueue<Runnable>());
tpe.allowCoreThreadTimeOut(true);

因为核心大小为10,所以在提交任务时会启动一个新线程,直到有10个线程处于活动状态。之后,任务将在 LinkedBlockingQueue 中排队。如果一个线程在 60 秒内处于非活动状态,它将终止。

您想要的行为可以通过编写一个实现 BlockingQueue 和 RejectedExecutionHandler 的类来实现,该类在确定任务应该添加到队列中还是被拒绝之前检查 ThreadPoolExecutors 当前状态。

【讨论】:

  • 这正是我所需要的,尽管我使用了一个有限大小的 ArrayBlockingQueue。我要补充一点,如果您预期某个最小负载,您可以使用 ExecutorService.prestartAllCoreThreads() 并调整您的空闲超时以模仿“始终在线”的核心线程行为。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-12-12
  • 1970-01-01
  • 2016-08-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多