【问题标题】:Elegantly implementing queue length indicators to ExecutorServices优雅地为 ExecutorServices 实现队列长度指示器
【发布时间】:2011-01-16 23:25:40
【问题描述】:

为什么,哦,为什么java.util.concurrent 不为其ExecutorServices 提供队列长度指示器?最近我发现自己在做这样的事情:

ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...

public void addTaskToQueue(Runnable runnable) {
    if (queueLength.get() < MAX_QUEUE_LENGTH) {
        queueLength.incrementAndGet(); // Increment queue when submitting task.
        queue.submit(new Runnable() {
            public void run() {
                runnable.run();
                queueLength.decrementAndGet(); // Decrement queue when task done.
            }
        });
    } else {
        // Trigger error: too long queue
    }
}

这行得通,但是...我认为这确实应该作为ExecutorService 的一部分来实现。从实际队列中携带一个 分离 的计数器是愚蠢且容易出错的,计数器应该指示其长度(让我想起了 C 数组)。但是,ExecutorServices 是通过静态工厂方法获得的,因此无法简单地扩展其他出色的单线程执行器并添加队列计数器。那我该怎么办:

  1. 重新发明已经在 J​​DK 中实现的东西?
  2. 其他巧妙的解决方案?

【问题讨论】:

    标签: java concurrency queue executorservice executor


    【解决方案1】:

    还有更直接的方法:

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
    // add jobs
    // ...
    int size = executor.getQueue().size();
    

    虽然您可能会考虑不使用 Executor 的便捷创建方法,而是直接创建 executor 以摆脱强制转换,从而确保 executor 实际上始终是 ThreadPoolExecutor,即使实现Executors.newSingleThreadExecutor 总有一天会改变。

    ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
    

    这是直接从 JDK 1.6 中的Executors.newSingleThreadExecutor 复制而来的。传递给构造函数的LinkedBlockingQueue 实际上就是您将从getQueue 返回的对象。

    【讨论】:

    • 哈!因此,您只需要重新实现 JDK 的很小一部分(一行)。完全可以接受 :-) 谢谢。
    • 实际上我会自己选择演员,因为 ThreadPoolExecutor 是一个公共类,因此在某种程度上也是 newSingleThreadExecutor 的公共 API 的一部分。我认为由 newSingleThreadExecutor 创建的对象不太可能不再是 ThreadPoolExecutor,而不是 newSingleThreadExecutor 中的构造函数调用的参数有一天可能会改变,即使用一些更有效或更精确的实现或其他方式。
    • 具有讽刺意味的是,在我尝试过的一台机器上,我得到了 FinalizableDelegatedExecutorService 而不是 ThreadPoolExecutor
    • 查看the source,我不明白您如何将ExecutorService 直接转换为ThreadPoolExecutor。我也像往常一样不断收到ClassCastException
    【解决方案2】:

    虽然您可以直接检查队列大小。处理队列过长的另一种方法是使内部队列有界。

    public static
    ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
      return new ThreadPoolExecutor(nThreads, nThreads,
                                  5000L, TimeUnit.MILLISECONDS,
                                  new ArrayBlockingQueue<Runnable>(queueSize, true));
    }
    

    当您超过限制时,这将导致 RejectedExecutionExceptions(请参阅this)。

    如果你想避免异常调用线程可以被劫持执行函数。请参阅this SO question 获取解决方案。

    参考文献

    【讨论】:

      猜你喜欢
      • 2011-09-08
      • 1970-01-01
      • 1970-01-01
      • 2011-11-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-04-05
      相关资源
      最近更新 更多