【问题标题】:Java ThreadPoolExecutor strategy, 'Direct Handoff' with queue?Java ThreadPoolExecutor 策略,带有队列的“直接切换”?
【发布时间】:2012-03-26 05:27:43
【问题描述】:

我希望有一个ThreadPoolExecutor,我可以在其中设置一个corePoolSize 和一个maximumPoolSize,然后队列会立即将任务移交给线程池,从而创建新线程,直到它到达maximumPoolSize 然后开始添加到队列中。

有这种事吗?如果没有,是否有充分的理由没有这样的策略?

我想要的本质上是提交任务以供执行,当它到达一个点,它基本上会因为拥有太多线程而获得“最差”的性能(通过设置 maximumPoolSize),它会停止添加新线程并使用该线程池并开始排队,然后如果队列已满,则拒绝。

当负载恢复时,它可以开始将未使用的线程分解回 corePoolSize。

这对我来说比http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html中列出的“三种通用策略”更有意义

【问题讨论】:

标签: java threadpool


【解决方案1】:

注意:这些实现有些缺陷和不确定性。在使用此代码之前,请阅读完整的答案和 cmets。

如何创建一个工作队列,在执行程序低于最大池大小时拒绝项目,并在达到最大值后开始接受它们?

这取决于记录在案的行为:

"如果一个请求不能入队,就会创建一个新线程,除非这 将超过 maximumPoolSize,在这种情况下,任务将 被拒绝。”

public class ExecutorTest
{
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAXIMUM_POOL_SIZE = 4;
    private static final int KEEP_ALIVE_TIME_MS = 5000;

    public static void main(String[] args)
    {
        final SaturateExecutorBlockingQueue workQueue = 
            new SaturateExecutorBlockingQueue();

        final ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_TIME_MS, 
                    TimeUnit.MILLISECONDS, 
                    workQueue);

        workQueue.setExecutor(executor);

        for (int i = 0; i < 6; i++)
        {
            final int index = i;
            executor.submit(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }

                    System.out.println("Runnable " + index 
                            + " on thread: " + Thread.currentThread());
                }
            });
        }
    }

    public static class SaturateExecutorBlockingQueue 
        extends LinkedBlockingQueue<Runnable>
    {
        private ThreadPoolExecutor executor;

        public void setExecutor(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }

        public boolean offer(Runnable e)
        {
            if (executor.getPoolSize() < executor.getMaximumPoolSize())
            {
                return false;
            }
            return super.offer(e);
        }
    }
}

注意:您的问题让我感到惊讶,因为我希望您期望的行为是配置为 corePoolSize


想法 #2

我想我有一个可能稍微好一点的方法。它依赖于在ThreadPoolExecutor 中编码到setCorePoolSize 方法中的副作用行为。这个想法是在工作项入队时临时和有条件地增加核心池大小。当增加核心池大小时,ThreadPoolExecutor 将立即产生足够的新线程来执行所有排队的 (queue.size()) 任务。然后我们立即减小核心池大小,这允许线程池在未来的低活动期间自然收缩。这种方法仍然不是完全确定的(例如,池大小有可能增长到最大池大小以上),但我认为它几乎在所有情况下都比第一种策略更好。

具体来说,我认为这种方法比第一种方法更好,因为:

  1. 它将更频繁地重用线程
  2. 它不会因为竞争而拒绝执行
  3. 我想再次提一下,第一种方法会导致线程池增长到最大大小,即使在非常少的使用情况下也是如此。这种方法在这方面应该更有效。

-

public class ExecutorTest2
{
    private static final int KEEP_ALIVE_TIME_MS = 5000;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAXIMUM_POOL_SIZE = 4;

    public static void main(String[] args) throws InterruptedException
    {
        final SaturateExecutorBlockingQueue workQueue = 
            new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE);

        final ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_TIME_MS, 
                    TimeUnit.MILLISECONDS, 
                    workQueue);

        workQueue.setExecutor(executor);

        for (int i = 0; i < 60; i++)
        {
            final int index = i;
            executor.submit(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }

                    System.out.println("Runnable " + index 
                            + " on thread: " + Thread.currentThread()
                            + " poolSize: " + executor.getPoolSize());
                }
            });
        }

        executor.shutdown();

        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public static class SaturateExecutorBlockingQueue 
        extends LinkedBlockingQueue<Runnable>
    {
        private final int corePoolSize;
        private final int maximumPoolSize;
        private ThreadPoolExecutor executor;

        public SaturateExecutorBlockingQueue(int corePoolSize, 
                int maximumPoolSize)
        {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
        }

        public void setExecutor(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }

        public boolean offer(Runnable e)
        {
            if (super.offer(e) == false)
            {
                return false;
            }
            // Uncomment one or both of the below lines to increase
            // the likelyhood of the threadpool reusing an existing thread 
            // vs. spawning a new one.
            //Thread.yield();
            //Thread.sleep(0);
            int currentPoolSize = executor.getPoolSize();
            if (currentPoolSize < maximumPoolSize 
                    && currentPoolSize >= corePoolSize)
            {
                executor.setCorePoolSize(currentPoolSize + 1);
                executor.setCorePoolSize(corePoolSize);
            }
            return true;
        }
    }
}

【讨论】:

  • 嗯,你的建议就是我在现在删除的答案的评论中所说的。反正。我想你会发现压倒一切的报价可能会导致问题,因为它没有锁。即使只有3条指令?或者这么久。
  • @RonaldChan 我也担心同样的事情,我一直在尝试解决潜在的问题,但我还没有解决任何问题。我会继续努力思考。
  • 无论如何都接受你的回答。这就是我最终实现的,而不是尝试重写 ThreadPoolExectuor 的执行方法,这要复杂得多。仅供参考,我还在 maximumPoolSize 中添加了一个缓冲区,使其比实际值低 5% 以允许比赛。不是理想的方法,但 ThreadPoolExecutor 的默认行为很奇怪,我找不到任何具有我描述的行为的外部 ThreadPools 实现。如果有,我会考虑更改接受的答案。
  • @RonaldChan 我认为这种方法比我们想象的要糟糕:它不仅可能不可靠,因为poolSize 周围缺乏锁定,它还阻止执行器重用任何线程直到线程池大小 == 最大值,即使从未要求执行器同时执行超过 1 个任务。我正在尝试一种我认为会更好的替代解决方案。
  • @RonaldChan 在答案末尾用另一种方法进行了更新。乍一看它可能看起来“丑陋”,但如果你看过去,我认为它比第一种方法更好。让我知道你的想法。
【解决方案2】:

我们通过以下代码找到了该问题的解决方案:

这个队列是一个混合的 SynchronousQueue / LinkedBlockingQueue。

public class OverflowingSynchronousQueue<E> extends LinkedBlockingQueue<E> {
  private static final long serialVersionUID = 1L;

  private SynchronousQueue<E> synchronousQueue = new SynchronousQueue<E>();

  public OverflowingSynchronousQueue() {
    super();
  }

  public OverflowingSynchronousQueue(int capacity) {
    super(capacity);
  }

  @Override
  public boolean offer(E e) {
    // Create a new thread or wake an idled thread
    return synchronousQueue.offer(e);
  }

  public boolean offerToOverflowingQueue(E e) {
    // Add to queue
    return super.offer(e);
  }

  @Override
  public E take() throws InterruptedException {
    // Return tasks from queue, if any, without blocking
    E task = super.poll();
    if (task != null) {
      return task;
    } else {
      // Block on the SynchronousQueue take
      return synchronousQueue.take();
    }
  }

  @Override
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // Return tasks from queue, if any, without blocking
    E task = super.poll();
    if (task != null) {
      return task;
    } else {
      // Block on the SynchronousQueue poll
      return synchronousQueue.poll(timeout, unit);
    }
  }

}

为了让它工作,我们需要包装 RejectedExecutionHandler 以在任务被拒绝时调用“offerToOverflowingQueue”。

public class OverflowingRejectionPolicyAdapter implements RejectedExecutionHandler {

  private OverflowingSynchronousQueue<Runnable> queue;
  private RejectedExecutionHandler adaptedRejectedExecutionHandler;

  public OverflowingRejectionPolicyAdapter(OverflowingSynchronousQueue<Runnable> queue,
                                           RejectedExecutionHandler adaptedRejectedExecutionHandler)
  {
    super();
    this.queue = queue;
    this.adaptedRejectedExecutionHandler = adaptedRejectedExecutionHandler;
  }

  @Override
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!queue.offerToOverflowingQueue(r)) {
      adaptedRejectedExecutionHandler.rejectedExecution(r, executor);
    }
  }
}

这是我们如何创建 ThreadPoolExecutor

public static ExecutorService newSaturatingThreadPool(int corePoolSize,
                                                        int maxPoolSize,
                                                        int maxQueueSize,
                                                        long keepAliveTime,
                                                        TimeUnit timeUnit,
                                                        String threadNamePrefix,
                                                        RejectedExecutionHandler rejectedExecutionHandler)
  {
  OverflowingSynchronousQueue<Runnable> queue = new OverflowingSynchronousQueue<Runnable>(maxQueueSize);
  OverflowingRejectionPolicyAdapter rejectionPolicyAdapter = new OverflowingRejectionPolicyAdapter(queue,
                                                                                                     rejectedExecutionHandler);
  ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
                                                         maxPoolSize,
                                                         keepAliveTime,
                                                         timeUnit,
                                                         queue,
                                                         new NamedThreadFactory(threadNamePrefix),
                                                         rejectionPolicyAdapter);
  return executor;
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-03-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-23
    • 2018-05-16
    • 2016-04-04
    相关资源
    最近更新 更多