【问题标题】:ThreadPoolExecutor's queuing behavior customizable to prefer new thread creation over queuing?ThreadPoolExecutor 的排队行为可定制为更喜欢创建新线程而不是排队?
【发布时间】:2013-08-16 18:14:27
【问题描述】:

ThreadPoolExecutor 医生说

如果 corePoolSize 或更多线程正在运行,Executor 总是 更喜欢排队请求而不是添加新线程。


如果大于 corePoolSize 但小于 maximumPoolSize 线程运行,只有当队列被创建时才会创建一个新线程 满了。

有没有办法让执行程序更喜欢创建新线程,直到达到最大值,即使有超过核心大小的线程,然后开始排队?如果队列达到最大大小,任务将被拒绝。如果在处理了繁忙的突发后超时设置将启动并将线程删除到核心大小,那就太好了。我看到了喜欢排队以进行节流的原因;但是,这种自定义还允许队列主要充当尚未运行的任务列表。

【问题讨论】:

    标签: java java.util.concurrent threadpoolexecutor


    【解决方案1】:

    没有办法通过ThreadPoolExecutor获得这种确切的行为。

    但是,这里有几个解决方案:

    1. 考虑一下,

      • 如果少于corePoolSize 个线程正在运行,则会为每个排队的项目创建一个新线程,直到coorPoolSize 线程正在运行。

      • 仅当队列已满且运行的线程数少于maximumPoolSize 时才会创建新线程。

      因此,将 ThreadPoolExecutor 包装在一个监控项目排队速度的类中。然后,在提交许多项目时将核心池大小更改为更高的值。这将导致每次提交新项目时都会创建一个新线程。

      提交突发完成后,需要再次手动减小核心池大小,以便线程自然超时。如果您担心忙突发可能会突然结束,导致手动方法失败,请务必使用allowCoreThreadTimeout

    2. 创建固定线程池,并allowCoreThreadTimeout

      不幸的是,这在低提交突发期间使用了更多线程,并且在零流量期间不存储空闲线程。

    如果您有时间、需要和意愿,请使用第一种解决方案,因为它可以处理更广泛的提交频率,因此在灵活性方面是一个更好的解决方案。

    否则使用第二种解决方案。

    【讨论】:

    • 很好的建议,即使考虑到我的误解。队列仍然会被使用,因为核心大小会发生变化,但是,考虑到我正在寻找的东西(JMS),它不会真的那么有益。最接近我最初寻找的答案。
    【解决方案2】:

    只需执行Executors.newFixedThreadPool 所做的操作,并将coremax 设置为相同的值。这是来自 Java 6 的 newFixedThreadPool 源代码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    如果你有一个现有的,你可以做什么:

    ThreadPoolExecutor tpe = ... ;
    tpe.setCorePoolSize(tpe.getMaxPoolSize());
    

    编辑: 正如威廉在 cmets 中指出的那样,这意味着所有线程都是核心线程,因此没有一个线程会超时并终止。要更改此行为,只需使用ThreadPoolExecutor.allowCoreThreadTimeout(true)。这将使得线程可以在执行程序不使用时超时并被清除。

    【讨论】:

    • 更好地让核心线程通过allowCoreThreadTimeout死掉
    • 这是个好建议。我最初误解了队列的目的。这是为了允许在超过核心大小之前通过排队进行节流,同时将线程返回到核心/重用。我认为它将用于超过最大大小的队列...我相信这就是 JMS 的用途。
    【解决方案3】:

    您的偏好似乎是在活动较少时将延迟降至最低。为此,我只需将 corePoolSize 设置为最大值并让额外的线程挂起。在高活动期间,这些线程无论如何都会存在。在低活动时期,它们的存在不会产生太大影响。如果你想让它们死掉,你可以设置核心线程超时。

    这样所有线程将始终可用于尽快执行任务。

    【讨论】:

      【解决方案4】:

      自定义阻塞队列

      package com.gunjan;
      
      import java.util.concurrent.BlockingQueue;
      
      public abstract class CustomBlockingQueue<E> implements BlockingQueue<E> {
      
      public BlockingQueue<E> blockingQueue;
      
      public CustomBlockingQueue(BlockingQueue blockingQueue) {
          this.blockingQueue = blockingQueue;
      }
      
      @Override
      final public boolean offer(E e) {
          return false;
      }
      
      final public boolean customOffer(E e) {
          return blockingQueue.offer(e);
      }
      }
      

      线程池阻塞队列

      package com.gunjan;
      
      import java.util.Collection;
      import java.util.Iterator;
      import java.util.concurrent.BlockingQueue;
      import java.util.concurrent.TimeUnit;
      
      public class ThreadPoolBlockingQueue<E> extends CustomBlockingQueue<E> {
      
          public ThreadPoolBlockingQueue(BlockingQueue blockingQueue) {
              super(blockingQueue);
          }
      
          @Override
          public E remove() {
              return this.blockingQueue.remove();
          }
      
          @Override
          public E poll() {
              return this.blockingQueue.poll();
          }
      
          @Override
          public E element() {
              return this.blockingQueue.element();
          }
      
          @Override
          public E peek() {
              return this.blockingQueue.peek();
          }
      
          @Override
          public int size() {
              return this.blockingQueue.size();
          }
      
          @Override
          public boolean isEmpty() {
              return this.blockingQueue.isEmpty();
          }
      
          @Override
          public Iterator<E> iterator() {
              return this.blockingQueue.iterator();
          }
      
          @Override
          public Object[] toArray() {
              return this.blockingQueue.toArray();
          }
      
          @Override
          public <T> T[] toArray(T[] a) {
              return this.blockingQueue.toArray(a);
          }
      
          @Override
          public boolean containsAll(Collection<?> c) {
              return this.blockingQueue.containsAll(c);
          }
      
          @Override
          public boolean addAll(Collection<? extends E> c) {
              return this.blockingQueue.addAll(c);
          }
      
          @Override
          public boolean removeAll(Collection<?> c) {
              return this.blockingQueue.removeAll(c);
          }
      
          @Override
          public boolean retainAll(Collection<?> c) {
              return this.blockingQueue.retainAll(c);
          }
      
          @Override
          public void clear() {
              this.blockingQueue.clear();
          }
      
          @Override
          public boolean add(E e) {
              return this.blockingQueue.add(e);
          }
      
          @Override
          public void put(E e) throws InterruptedException {
              this.blockingQueue.put(e);
          }
      
          @Override
          public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
              return this.blockingQueue.offer(e, timeout, unit);
          }
      
          @Override
          public E take() throws InterruptedException {
              return this.blockingQueue.take();
          }
      
          @Override
          public E poll(long timeout, TimeUnit unit) throws InterruptedException {
              return this.blockingQueue.poll(timeout, unit);
          }
      
          @Override
          public int remainingCapacity() {
              return this.blockingQueue.remainingCapacity();
          }
      
          @Override
          public boolean remove(Object o) {
              return this.blockingQueue.remove(o);
          }
      
          @Override
          public boolean contains(Object o) {
              return this.blockingQueue.contains(o);
          }
      
          @Override
          public int drainTo(Collection<? super E> c) {
              return this.blockingQueue.drainTo(c);
          }
      
          @Override
          public int drainTo(Collection<? super E> c, int maxElements) {
              return this.blockingQueue.drainTo(c, maxElements);
          }
      }
      

      RejectedExecutionHandlerImpl

      package com.gunjan;
      
      import java.util.concurrent.RejectedExecutionException;
      import java.util.concurrent.RejectedExecutionHandler;
      import java.util.concurrent.ThreadPoolExecutor;
      
      public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
      
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
              boolean inserted = ((CustomBlockingQueue) executor.getQueue()).customOffer(r);
              if (!inserted) {
                  throw new RejectedExecutionException();
              }
          }
      }
      

      CustomThreadPoolExecutorTest

      package com.gunjan;
      
      import java.util.concurrent.*;
      
      public class CustomThreadPoolExecutorTest {
      
      public static void main(String[] args) throws InterruptedException {
          LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<Runnable>(500);
          CustomBlockingQueue customLinkedBlockingQueue = new ThreadPoolBlockingQueue<Runnable>(linkedBlockingQueue);
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 100, 60, TimeUnit.SECONDS,
                  customLinkedBlockingQueue, new RejectedExecutionHandlerImpl());
      
      
          for (int i = 0; i < 750; i++) {
              try {
                  threadPoolExecutor.submit(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              Thread.sleep(1000);
                              System.out.println(threadPoolExecutor);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  });
              } catch (RejectedExecutionException e) {
                  e.printStackTrace();
              }
      
          }
      
          threadPoolExecutor.shutdown();
          threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
          System.out.println(threadPoolExecutor);
      }
      }
      

      【讨论】:

        猜你喜欢
        • 2013-12-13
        • 2018-01-31
        • 1970-01-01
        • 2018-11-18
        • 2013-04-19
        • 1970-01-01
        • 2011-12-09
        • 1970-01-01
        • 2011-06-09
        相关资源
        最近更新 更多