自从Java5开始,Java并发API提供了一套意在解决这些问题的机制。这套机制称之为执行器框架(Executor Framework),围绕着Executor接口和它的子接口ExecutorService,以及实现这两个接口的ThreadPoolEexecutor类展开。这套机制分离了任务的创建和执行。通过使用执行器,仅需要实现Runnable接口的对象,然后将这些对象发送给执行器即可。执行器通过创建所需的线程,来负责这些Runnable对象的创建、实例化以及运行。但是执行器功能不限于此,它使用了线程池来提高应用程序的性能。当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务,避免了不断的创建和销毁线程而导致系统性能下降。
执行器框架的另一个重要的优势是Callable接口。它类似于Runnable接口,但是却提供了两个方面的增强。
- 这个接口的主方法名称为call(),可以返回结果。
- 当发送一个Callable对象给执行器时,将获得一个实现了Future接口的对象。可以使用这个对象来控制Callable对象的状态和结果。
1. 创建线程执行器
使用执行器框架的第一步是创建ThreadPoolExecutor对象。可以ThreadPoolExecutor类提供的四个构造器或者使用Executors工厂类来创建ThreadPoolExecutor对象。一旦有了执行器,就可以将Runnable或Callable对象发送给它去执行了。
下面我们将学习如何使用两种操作来实现一个范例,这个范例将模拟一个Web服务器来应对来自不同客户端的请求。
1. 实现Web服务器执行的任务,创建一个名为Task的类,并实现Runnable接口。
import java.util.Date; import java.util.concurrent.TimeUnit; public class Task implements Runnable { private Date initDate; private String name; public Task(String name){ initDate = new Date(); this.name = name; } @Override public void run() { System.out.printf("%s:Task %s: Created on: %s\n", Thread.currentThread().getName(), name, initDate); System.out.printf("%s:Task %s: Started on: %s\n", Thread.currentThread().getName(), name, new Date()); try { Long duration = (long) (Math.random()*10); System.out.printf("%s:Task %s: Doing a task during %d seconds\n", Thread.currentThread().getName(), name, duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("%s:Task %s: Finished on: %s\n", Thread.currentThread().getName(), name, new Date()); } }
2. 创建一个名为Server的类,它将执行通过接收器收到的每一个任务。
import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class Server { private ThreadPoolExecutor executor; public Server(){ //通过工厂类创建 executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();; } public void executTask(Task task){ System.out.println("Server: A new task has arrived\n"); executor.execute(task); System.out.printf("Server: Pool Size: %d\n", executor.getPoolSize()); System.out.printf("Server: Active Count: %d\n", executor.getActiveCount()); System.out.printf("Server: Completed Tasks: %d\n", executor.getCompletedTaskCount()); } public void endServer(){ executor.shutdown(); } }
3. 实现范例的主类,创建Main类,并实现main()方法。
public class Main { public static void main(String[] args) { Server server = new Server(); for(int i=0;i<100;i++){ Task task = new Task("Task"+i); server.executTask(task); } server.endServer(); } }
如果要执行新任务,缓存线程池就会创建新线程;如果线程所运行的任务执行完成后并且这个线程可用,那么缓存线程池将重用这些线程。线程重用的优点是减少了创建新线程所花费的开销。然而,新任务固定会依赖线程执行,因此缓存池也有缺点,如果发送过多的任务给执行器,系统的负荷将会过载。
备注:仅当线程的数量是合理的或者线程只会运行很短的时间时,适合使用Executors工厂类的newCachedThreadPool()方法来创建执行器。
执行器以及ThreadPoolExecutor类的一个重要的特性是,通常需要显式地去结束它。如果不这样做,那么执行器将继续执行,程序不会结束。为了完成执行器的执行,可以使用ThreadPoolExecutor类的shutdown()方法。
2. 创建固定大小的线程执行器
当使用Executors类的newCachedThreadPool()方法创建基本的ThreadPoolExecutor时,执行器运行过程中将碰到线程数量的问题。如果线程池里没有空闲的线程可用,那么执行器将为接收到的每一个任务创建一个新的线程,当发送大量的任务给执行器并且任务需要持续较长的时间时,系统就会超负荷,应用程序也将随之性能不佳。
为了避免这个问题,Executors工厂类提供了一个方法来创建一个固定大小的线程执行器。这个执行器有一个线程数的最大值,如果发送超过这个最大值的任务给执行器,执行器将不再创建额外的线程,剩下的任务将被阻塞直到执行器有空闲的线程可用。这个特性可以保证执行器不会给应用程序带来性能不佳的问题。
将上面的范例进行修改如下:
1. 将Server类中的执行器构造器进行修改,使用newFixedThreadPool()方法来创建执行器,并增加一条日志信息。
import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class Server { private ThreadPoolExecutor executor; public Server(){ //通过工厂类创建 executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);; } public void executTask(Task task){ System.out.println("Server: A new task has arrived\n"); executor.execute(task); System.out.printf("Server: Pool Size: %d\n", executor.getPoolSize()); System.out.printf("Server: Active Count: %d\n", executor.getActiveCount()); System.out.printf("Server: Completed Tasks: %d\n", executor.getCompletedTaskCount()); System.out.printf("Server: Task Count: %d\n", executor.getTaskCount()); } public void endServer(){ executor.shutdown(); } }
getTaskCount()方法可以用来显示有多少个任务已经发送给执行器。
Executors工厂类也提供了newSingleThreadExecutor()方法。这是一个创建固定大小线程执行器的极端场景,它将创建一个只有单个线程的执行器。因此,这个执行器只能在同一时间执行一个任务。
3. 在执行器中执行任务并返回结果
执行器框架的优势之一是,可以运行并发任务并返回结果。Java并发API通过以下两个接口来实现这个功能。
Callable:这个接口声明了call()方法。可以在这个方法里实现任务的具体逻辑操作。Callable接口是一个泛型接口,这就意味着必须声明call()方法返回的数据类型。
Future:这个接口声明了一些方法来获取由Callable对象产生的结果,并管理它们的状态。
下面我们学习如何实现任务的返回结果,并在执行器中运行任务。
1. 创建名为FactorialCalculator的类,并实现Callable接口,接口的泛型类型为Integer类型。
import java.util.concurrent.Callable; public class FactorialCalculator implements Callable<Integer> { private int number; public FactorialCalculator(int number){ this.number = number; } //计算阶乘 public Integer call() throws Exception { Integer result = 1; if(number==0||number==1) result = 1; else{ for(int i=2;i<=number;i++){ result *= i; //为了演示效果,休眠20ms Thread.sleep(20); } } System.out.printf("%s: %d\n", Thread.currentThread().getName(), result); return result; } }
2. 实现范例的主类,创建Main类,并实现main()方法。
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); //存放结果的列表 List<Future<Integer>> resultList = new ArrayList<>(); //通过Random类生成一个随机数生成器 Random random = new Random(); for(int i=0;i<10;i++){ int number = random.nextInt(10); FactorialCalculator calculator = new FactorialCalculator(number); Future<Integer> result = executor.submit(calculator); resultList.add(result); } //创建一个循环来监控执行器的状态 try { while(executor.getCompletedTaskCount()<resultList.size()){ System.out.printf("Main: Number of Completed Tasks: %d\n", executor.getCompletedTaskCount()); for(int i=0;i<resultList.size();i++){ Future<Integer> result = resultList.get(i); System.out.printf("Main: Task %d: %s\n", i, result.isDone()); } Thread.sleep(50); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Main: Results"); try { for(int i=0;i<resultList.size();i++){ Future<Integer> result = resultList.get(i); Integer number = null; number = result.get(); System.out.printf("Main: Task %d: %d\n", i, number); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } executor.shutdown(); } }
3. 程序运行结果如下
Main: Number of Completed Tasks: 0 Main: Task 0: false Main: Task 1: false Main: Task 2: false Main: Task 3: false Main: Task 4: false Main: Task 5: false Main: Task 6: false Main: Task 7: false Main: Task 8: false Main: Task 9: false Main: Number of Completed Tasks: 0 Main: Task 0: false Main: Task 1: false Main: Task 2: false Main: Task 3: false Main: Task 4: false Main: Task 5: false Main: Task 6: false Main: Task 7: false Main: Task 8: false Main: Task 9: false pool-1-thread-1: 720 Main: Number of Completed Tasks: 1 Main: Task 0: true Main: Task 1: false Main: Task 2: false Main: Task 3: false Main: Task 4: false Main: Task 5: false Main: Task 6: false Main: Task 7: false Main: Task 8: false Main: Task 9: false pool-1-thread-2: 362880 Main: Number of Completed Tasks: 2 Main: Task 0: true Main: Task 1: true Main: Task 2: false Main: Task 3: false Main: Task 4: false Main: Task 5: false Main: Task 6: false Main: Task 7: false Main: Task 8: false Main: Task 9: false pool-1-thread-2: 6 pool-1-thread-1: 720 pool-1-thread-1: 1 pool-1-thread-1: 1 pool-1-thread-1: 1 Main: Number of Completed Tasks: 7 Main: Task 0: true Main: Task 1: true Main: Task 2: true Main: Task 3: true Main: Task 4: false Main: Task 5: true Main: Task 6: true Main: Task 7: true Main: Task 8: false Main: Task 9: false Main: Number of Completed Tasks: 7 Main: Task 0: true Main: Task 1: true Main: Task 2: true Main: Task 3: true Main: Task 4: false Main: Task 5: true Main: Task 6: true Main: Task 7: true Main: Task 8: false Main: Task 9: false pool-1-thread-1: 5040 Main: Number of Completed Tasks: 8 Main: Task 0: true Main: Task 1: true Main: Task 2: true Main: Task 3: true Main: Task 4: false Main: Task 5: true Main: Task 6: true Main: Task 7: true Main: Task 8: true Main: Task 9: false pool-1-thread-2: 40320 Main: Number of Completed Tasks: 9 Main: Task 0: true Main: Task 1: true Main: Task 2: true Main: Task 3: true Main: Task 4: true Main: Task 5: true Main: Task 6: true Main: Task 7: true Main: Task 8: true Main: Task 9: false Main: Number of Completed Tasks: 9 Main: Task 0: true Main: Task 1: true Main: Task 2: true Main: Task 3: true Main: Task 4: true Main: Task 5: true Main: Task 6: true Main: Task 7: true Main: Task 8: true Main: Task 9: false pool-1-thread-1: 362880 Main: Results Main: Task 0: 720 Main: Task 1: 362880 Main: Task 2: 720 Main: Task 3: 6 Main: Task 4: 40320 Main: Task 5: 1 Main: Task 6: 1 Main: Task 7: 1 Main: Task 8: 5040 Main: Task 9: 362880