在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段。本章会配合一些应用场景来介绍如何使用这些工具类。
1,等待多线程完成的CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。
假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成(或者汇总结果)。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join()方法,如代码清单8-1所示。
1 import java.util.Random; 2 import java.util.concurrent.atomic.AtomicInteger; 3 4 public class JoinCountDownLatchTest { 5 private static Random sr=new Random(47); 6 private static AtomicInteger result=new AtomicInteger(0); 7 private static int threadCount=10; 8 private static class Parser implements Runnable{ 9 String name; 10 public Parser(String name){ 11 this.name=name; 12 } 13 @Override 14 public void run() { 15 int sum=0; 16 int seed=Math.abs(sr.nextInt()) ; 17 Random r=new Random(47); 18 for(int i=0;i<100;i++){ 19 sum+=r.nextInt(seed); 20 } 21 result.addAndGet(sum); 22 System.out.println(name+"线程的解析结果:"+sum); 23 } 24 } 25 public static void main(String[] args) throws InterruptedException { 26 Thread[] threads=new Thread[threadCount]; 27 for(int i=0;i<threadCount;i++){ 28 threads[i]=new Thread(new Parser("Parser-"+i)); 29 } 30 for(int i=0;i<threadCount;i++){ 31 threads[i].start(); 32 } 33 for(int i=0;i<threadCount;i++){ 34 threads[i].join(); 35 } 36 System.out.println("所有线程解析结束!"); 37 System.out.println("所有线程的解析结果:"+result); 38 } 39 }
输出:
Parser-1线程的解析结果:-2013585201
Parser-0线程的解析结果:1336321192
Parser-2线程的解析结果:908136818
Parser-5线程的解析结果:-1675827227
Parser-3线程的解析结果:1638121055
Parser-4线程的解析结果:1513365118
Parser-6线程的解析结果:489607354
Parser-8线程的解析结果:1513365118
Parser-7线程的解析结果:-1191966831
Parser-9线程的解析结果:-912399159
所有线程解析结束!
所有线程的解析结果:1605138237
join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中,wait(0)表示永远等待下去,代码片段如下。join在内部使用wait进行等待。
1 public class Thread implements Runnable { 2 ...... 3 public final void join() throws InterruptedException { 4 join(0); 5 } 6 public final synchronized void join(long millis) 7 throws InterruptedException { 8 long base = System.currentTimeMillis(); 9 long now = 0; 10 11 if (millis < 0) { 12 throw new IllegalArgumentException("timeout value is negative"); 13 } 14 15 if (millis == 0) {//执行到这里 16 while (isAlive()) { 17 wait(0);//main线程永远等待join线程 18 } 19 } else { 20 while (isAlive()) { 21 long delay = millis - now; 22 if (delay <= 0) { 23 break; 24 } 25 wait(delay); 26 now = System.currentTimeMillis() - base; 27 } 28 } 29 } 30 ...... 31 }