LionElegy

1 准备知识

介绍线程池之前先简要了解一下Executor,ExecutorService,Future,Callable,Executors是什么,和线程池又有什么关系

1.1 Executor

它是线程池*接口。它定义了一个方法void execute(Runnable)

这个方法是用于处理任务的一个服务方法,调用者提供Runnable接口的实现,线程池通过线程执行这个Runnable,该服务方法是无返回值的

1.2 ExecutorService

ExecutorService是Executor接口的子接口,它提供了一个新的服务方法submit,是有返回值的,返回值类型为Future类型(关于Future见1.3),它提供返回值主要是由Callable的call方法提供返回值(Callable见1.4) ,所有的线程池类型都实现这个接口

1.3 Future

顾名思义,Future->未来,代表线程任务执行结束后的结果。
获取线程执行结果的方式是通过get方法获取的,get有两种方式,有参和无参

无参T get()->阻塞等待线程执行结束,并得到结果。
有参T get(long, TimeUnit)->阻塞固定时长,等待线程执行结束后的结果,如果在阻塞时长范围内,线程未执行结束,抛出异常。

1.4 Callable

Callable类似Runnable接口,它有一个call方法,它的作用和Runnable中的run方法完全一致,但也有区别
Callable的call->有返回值,可以抛出任意异常
Runnable的run-> 无返回值,不能抛出未检查的异常

call方法的返回值就是Future中get方法的返回值

1.5 Executors

Executors是一个工具类,类似Collection和Collections的关系,可以更简单的创建若干种线程池,通过Executors可以直接得到想要的线程池

2 线程池

线程池状态: Running, ShuttingDown, Termitnaed

  • Running - 线程池正在执行中。活动状态。
  • ShuttingDown - 线程池正在关闭过程中。优雅关闭。一旦进入这个状态,线程池不再接收新的任务,处理所有已接收的任务,处理完毕后,关闭线程池。
  • Terminated - 线程池已经关闭。

2.1 固定容量线程池FixedThreadPool

FixedThreadPool是固定容量线程池,创建线程池的时候容量固定,使用的是BlockingQueue作为任务的载体,线程池默认的容量上限是Integer.MAX_VALUE

  • 特点:当任务数量大于线程池容量的时候,没有运行的任务保存在任务队列中,当线程有空闲的,自动从队列中取出任务执行
  • 使用场景: 大多数情况下,使用的线程池,首选推荐FixedThreadPool。OS系统和硬件是有线程支持上限。不能随意的无限制提供线程池。

下面是一个无返回值的小案例:
案例中创建了一个线程池,容量为5,执行6个任务,分析调用shutdown方法后,分析任务的执行情况

/**
 * 线程池
 * 固定容量线程池
 */
package com.bernardlowe.concurrent.t08;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test_02_FixedThreadPool {
	
	public static void main(String[] args) {
		ExecutorService service = 
				Executors.newFixedThreadPool(5);
		for(int i = 0; i < 6; i++){
			service.execute(new Runnable() {
				@Override
				public void run() {
					try {
						TimeUnit.MILLISECONDS.sleep(500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + " - test executor");
				}
			});
		}
		
		System.out.println("初始状态:" + service);

		System.out.println("开始调用shutdown方法=====");
		service.shutdown();
		// 是否已经结束, 相当于回收了资源。
		System.out.println("是否terminated:" + service.isTerminated());
		// 是否已经关闭, 是否调用过shutdown方法
		System.out.println("是否shutdown:" + service.isShutdown());
		System.out.println("shutdown后的状态:" + service);
		
		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// service.shutdown();
		System.out.println("2秒过后任务全部执行完====");
		System.out.println("是否terminated:" + service.isTerminated());
		System.out.println("是否shutdown:" + service.isShutdown());
		System.out.println("任务全部执行完过后状态:" + service);
	}

}


结果:

从图中可以分析出以下几个过程
在初始状态:五个执行线程,1个任务在等待队列,0个完成任务

调用shutdown方法后:线程池未关闭(terminated为false),调用了shutdown(不再接收新任务),0个完成任务

两秒后任务执行完毕:线程池已关闭(terminated为true),调用了shutdown(不再接收新任务),6个完成任务

下面是一个有返回值的小案例:
案例中创建了一个线程池,容量为1,submit方法传了一个Callable,future通过get获取线程的返回值

/**
 * 线程池
 * 固定容量线程池(有返回值)
 */
package com.bernardlowe.concurrent.t08;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class Test_03_Future {
	
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		
		ExecutorService service = Executors.newFixedThreadPool(1);

		Future<String> future = service.submit(new Callable<String>() {
			@Override
			public String call() {
				try {
					TimeUnit.MILLISECONDS.sleep(500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return Thread.currentThread().getName() + " - test executor";
			}
		});
		System.out.println("线程是否结束: " + future.isDone()); // 查看线程是否结束, 任务是否完成。 call方法是否执行结束

		System.out.println("call方法的返回值: " + future.get()); // 获取call方法的返回值。
		System.out.println("线程是否结束: " + future.isDone());

		// 关闭线程池
		service.shutdown();
	}

}

结果:

2.2 CachedThreadPool

缓存的线程池, 容量不限(Integer.MAX_VALUE),自动扩容
容量管理策略:如果线程池中的线程数量不满足任务执行,创建新的线程。每次有新任务无法即时处理的时候,都会创建新的线程。当线程池中的线程空闲时长达到一定的临界值(默认60秒),自动释放线程,这里通过Executors.newCachedThreadPool()方法得到的线程池无法修改空闲时间,具体原因见下图,但可以通过自定义线程池ThreadPoolExecutor修改,具体方法见2.5,这里就不解释了

应用场景: 内部应用或测试应用。

  • 内部应用,有条件的内部数据瞬间处理时应用,如:电信平台夜间执行数据整理(有把握在短时间内处理完所有工作,且对硬件和软件有足够的信心)。
  • 测试应用,在测试的时候,尝试得到硬件或软件的最高负载量,用于提供FixedThreadPool容量的指导

案例演示:

/**
 * 线程池
 * 无容量限制的线程池(最大容量默认为Integer.MAX_VALUE)
 */
package com.bernardlowe.concurrent.t08;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test_05_CachedThreadPool {
	
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		
		System.out.println(service);
		
		for(int i = 0; i < 5; i++){
			service.execute(new Runnable() {
				@Override
				public void run() {
					try {
						TimeUnit.MILLISECONDS.sleep(500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + " - test executor");
				}
			});
		}
		
		System.out.println(service);
		
		try {
			TimeUnit.SECONDS.sleep(65);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		System.out.println(service);
	}

}

2.3 计划任务线程池ScheduledThreadPool

ScheduledThreadPool是计划任务线程池,可以根据计划自动执行任务的线程池,底层实现是一个DelayedWorkQueue,它的一个主要方法scheduleAtFixedRate

有以下几个参数:

  • command - 要执行的任务
  • initialDelay - 第一次任务执行的间隔。
  • period - 多次任务执行的间隔。
  • unit - 多次任务执行间隔的时间单位。

案例:

/**
 * 线程池
 * 计划任务线程池。
 */
package com.bernardlowe.concurrent.t08;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test_07_ScheduledThreadPool {
	
	public static void main(String[] args) {
		ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
		System.out.println(service);
		
		// 定时完成任务。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
		// runnable - 要执行的任务。
		service.scheduleAtFixedRate(new Runnable() {
			@Override
			public void run() {
				try {
					TimeUnit.MILLISECONDS.sleep(500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName());
			}
		}, 0, 300, TimeUnit.MILLISECONDS);
		
	}

}

2.4 单一容量的线程池SingleThreadExecutor

单一容量的线程池,用法和FixedThreadPool类似,但和newFixedThreadPool不一样的是newSingleThreadExecutor创建的线程池又被一个FinalizableDelegatedExecutorService包装了一下
总结一下SingleThreadExecutor:

  • 单线任务处理的线程池
  • shutdown方法必然会被调用
  • 不具备ThreadPoolExecutor所有功能的线程池
    具体可以看看这篇文章:https://www.jianshu.com/p/2b7d853322bb

2.5 分支合并线程池ForkJoinPool

分支合并线程池(mapduce类似的设计思想),可以递归完成复杂任务,适合用于处理复杂任务
要求可分支合并的任务必须是ForkJoinTask类型的子类型
ForkJoinTask类型提供了两个抽象子类型:
RecursiveTask有返回结果的分支合并任务
RecursiveAction无返回结果的分支合并任务

案例:
这个案例做了一个以ForkJoinPool实现的数据累加,当计算数字区间大于MAX_SIZE=50000时,开启新的线程任务的计算,最后合并统计结果

/**
 * 线程池
 * 分支合并线程池。
 */
package com.bernardlowe.concurrent.t08;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class Test_08_ForkJoinPool {
	
	final static int[] numbers = new int[1000000];
	final static int MAX_SIZE = 500000;
	final static Random r = new Random();
	
	
	static{
		for(int i = 0; i < numbers.length; i++){
			numbers[i] = r.nextInt(1000);
		}
	}
	
	static class AddTask extends RecursiveTask<Long>{ // RecursiveAction
		int begin, end;
		public AddTask(int begin, int end){
			this.begin = begin;
			this.end = end;
		}
		
		// 
		protected Long compute(){
			if((end - begin) < MAX_SIZE){
				long sum = 0L;
				for(int i = begin; i < end; i++){
					sum += numbers[i];
				}
				// System.out.println("form " + begin + " to " + end + " sum is : " + sum);
				return sum;
			}else{
				int middle = begin + (end - begin)/2;
				AddTask task1 = new AddTask(begin, middle);
				AddTask task2 = new AddTask(middle, end);
				task1.fork();// 就是用于开启新的任务的。 就是分支工作的。 就是开启一个新的线程任务。
				task2.fork();
				// join - 合并。将任务的结果获取。 这是一个阻塞方法。一定会得到结果数据。
				return task1.join() + task2.join();
			}
		}
	}
	
	public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
		long result = 0L;
		for(int i = 0; i < numbers.length; i++){
			result += numbers[i];
		}
		System.out.println(result);
		
		ForkJoinPool pool = new ForkJoinPool();
		AddTask task = new AddTask(0, numbers.length);
		
		Future<Long> future = pool.submit(task);
		System.out.println(future.get());
		
	}

}

结果:该任务分类四个线程任务进行计算,最后汇总

2.5 ThreadPoolExecutor

ThreadPoolExecutor线程池的底层实现,除ForkJoinPool外,其他常用线程池底层都是使用ThreadPoolExecutor实现的,其中有一个构造方法如下:

  • corePoolSize:核心容量,创建线程池的时候,默认有多少线程。也是线程池保持的最少线程数
  • maximumPoolSize: 最大容量,线程池最多有多少线程
  • keepAliveTime: 生命周期,0为永久。当线程空闲多久后,自动回收
  • unit: 生命周期单位,为生命周期提供单位,如:秒,毫秒
  • workQueue 任务队列,阻塞队列。注意,泛型必须是Runnable

案例:

/**
 * 线程池
 * 固定容量线程池
 */
package com.bernardlowe.concurrent.t08;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test_09_ThreadPoolExecutor {
	
	public static void main(String[] args) {
		// 模拟fixedThreadPool, 核心线程5个,最大容量5个,线程的生命周期无限。
		ExecutorService service = 
				new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 
						new LinkedBlockingQueue<Runnable>());
		
		for(int i = 0; i < 6; i++){
			service.execute(new Runnable() {
				@Override
				public void run() {
					try {
						TimeUnit.MILLISECONDS.sleep(500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + " - test executor");
				}
			});
		}
		
		System.out.println(service);
		
		service.shutdown();
		System.out.println(service.isTerminated());
		System.out.println(service.isShutdown());
		System.out.println(service);
		
		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		service.shutdown();
		System.out.println(service.isTerminated());
		System.out.println(service.isShutdown());
		System.out.println(service);
		
	}
}

分类:

技术点:

相关文章: