https://www.cnblogs.com/jmsjh/p/7762034.html
转自 https://www.cnblogs.com/lic309/p/4186880.html

一:  ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段:

    private ThreadPoolExecutor threadPoolExecutor;

  可以发现,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现,

  直接看代码:

Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列
    @Override
    protected ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
        ThreadPoolExecutor executor  = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);
        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }
Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列

  这是ThreadPoolTaskExecutor用来初始化threadPoolExecutor的方法,BlockingQueue是一个阻塞队列,这个我们先不管。由于ThreadPoolTaskExecutor的实现方式完全是使用threadPoolExecutor进行实现,我们需要知道这个threadPoolExecutor的一些参数。

  

Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列
   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列

  这个是调用的构造函数:

  int corePoolSize:线程池维护线程的最小数量. 
  int maximumPoolSize:线程池维护线程的最大数量. 
  long keepAliveTime:空闲线程的存活时间. 
  TimeUnit unit: 时间单位,现有纳秒,微秒,毫秒,秒枚举值. 
  BlockingQueue<Runnable> workQueue:持有等待执行的任务队列. 
  RejectedExecutionHandler handler: 
  用来拒绝一个任务的执行,有两种情况会发生这种情况。 
  一是在execute方法中若addIfUnderMaximumPoolSize(command)为false,即线程池已经饱和; 
  二是在execute方法中, 发现runState!=RUNNING || poolSize == 0,即已经shutdown,就调用ensureQueuedTaskHandled(Runnable command),在该方法中有可能调用reject。

ThreadPoolExecutor池子的处理流程如下:  

1)当池子大小小于corePoolSize就新建线程,并处理请求

2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

其会优先创建  CorePoolSiz 线程, 当继续增加线程时,先放入Queue中,当 CorePoolSiz  和 Queue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出错 误 org.springframework.core.task.TaskRejectedException

另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常。

 

  这个是ThreadPoolExecutor的运算流程,既然ThreadPoolTaskExecutor是直接使用ThreadPoolExecutor进行处理,所以运算规则肯定一样。

在spring中使用ThreadPoolTaskExecutor的配置:

Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列
 <!-- 异步线程池 -->
    <bean 
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心线程数 -->
        <property name="corePoolSize" value="3" />
        <!-- 最大线程数 -->
        <property name="maxPoolSize" value="10" />
        <!-- 队列最大长度 >=mainExecutor.maxSize -->
        <property name="queueCapacity" value="25" />
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name="keepAliveSeconds" value="300" />
        <!-- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.  -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
        </property>
    </bean>
Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列

Reject策略预定义有四种: 

Reject 策略详解:https://blog.csdn.net/jgteng/article/details/54411423

(1)ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。 
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃. 
(3)ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃. 
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程).

向spring容器中加入ThreadPoolTaskExecutor后,使用时只需要调用其的execute方法,其参数为一个Runnable。

Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列
threadPool.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println("=======");

                }
            });
Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列

ThreadPoolTaskExecutor有两个execute的重载,但翻看代码可以知道调用的是同一个方法,所以只调用execute就可以了

Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列
    @Override
    public void execute(Runnable task) {
        Executor executor = getThreadPoolExecutor();
        try {
            executor.execute(task);
        }
        catch (RejectedExecutionException ex) {
            throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
        }
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        execute(task);
    }
Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列

在execute中调用的是ThreadPoolExecutor中的execute方法,执行了上面的处理流程后执行任务。

Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列
  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
Spring线程池ThreadPoolTaskExecutor的底层及阻塞队列

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-07-08
  • 2021-11-02
  • 2021-08-15
  • 2021-12-02
猜你喜欢
  • 2021-10-20
  • 2021-05-02
  • 2022-03-03
  • 2021-08-05
  • 2021-06-30
  • 2021-06-30
相关资源
相似解决方案