作为BlockingQueue最常见的实现类之一,ArrayBlockingQueue是通过数组实现的FIFO先进先出有界阻塞队列,它的大小在实例被初始化的时候就被固定了,不能更改。该类支持一个可选的公平策略,用于被阻塞等待的线程获取独占锁的排序,因为ArrayBlockingQueue内部的操作都需要获取一个ReentrantLock锁,该锁是支持公平策略的,所以ArrayBlockingQueue的公平策略就直接作用于ReentrantLock锁,决定线程是否有公平获取锁的权利。默认情况下是非公平的,公平模式下队列按照FIFO顺序授予线程访问权。公平性通常会降低吞吐量,但会降低可变性并避免饥饿。

源码分析

因为ArrayBlockingQueue的源码相对来说简单,这里只分析一些有代表性的方法,首先是构造方法,ArrayBlockingQueue提供了三个构造方法,除了队列的容量大小必须指定,还可以指定公平性和初始元素集合。只看三个参数的构造方法:

 1 public ArrayBlockingQueue(int capacity, boolean fair,
 2                           Collection<? extends E> c) {
 3     this(capacity, fair); //先执行两个参数的构造方法,初始化各个成员变量
 4 
 5     final ReentrantLock lock = this.lock;
 6     lock.lock(); // Lock only for visibility, not mutual exclusion
 7     try {
 8         int i = 0;
 9         try {
10             for (E e : c) {
11                 checkNotNull(e);
12                 items[i++] = e;
13             }
14         } catch (ArrayIndexOutOfBoundsException ex) {
15             throw new IllegalArgumentException();
16         }
17         count = i;
18         putIndex = (i == capacity) ? 0 : i;
19     } finally {
20         lock.unlock();
21     }
22 }

在该方法中,先执行了两个参数的构造方法初始化了所有成员变量。对于初始元素集合进行了额外的处理:循环将初始元素集合添加到队列中的数组中。这里为何要加锁呢?因为虽然用于保存元素的数组items是final修饰的,根据final关键字内存语义,不需要加锁,这里的循环将元素添加至数组中的操作也可以保证初始化结束之后数组元素对其它任何线程都可见,但是这里操作的count以及putIndex变量只是普通变量,所以这里加锁只是为了保证这些普通变量的内存可见性。

随便提一句,在JDK中作者习惯将在方法中多次(读取)操作的类成员变量赋予方法内部的局部变量,例如这里的 ReentrantLock lock = this.lock,其实是为了加快程序运行效率,每次都直接引用类成员,比如this.lock这样的读取操作相对于在方法内部直接操作方法栈的局部变量效率相对要低一点点,这可以通过javap生成具体的指令对比得到验证。

 接下来分析一个可阻塞的插入操作方法put:

public void put(E e) throws InterruptedException {
    checkNotNull(e); //禁止放入null
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); //可打断的锁。
    try {
        while (count == items.length)
            notFull.await(); //如果队列满了阻塞
        enqueue(e); //被消费线程唤醒之后,进行入队操作
    } finally {
        lock.unlock();
    }
}

 ArrayBlockingQueue不允许插入null值,而且offer、poll、take、peek、size()等几乎所有方法都是以加锁的方式进行操作的(甚至是toString)。所以都是线程安全的。具体的入队操作由enqueue方法完成:

 1 private void enqueue(E x) {
 2     // assert lock.getHoldCount() == 1;
 3     // assert items[putIndex] == null;
 4     final Object[] items = this.items;
 5     items[putIndex] = x;  //直接放入putIndex位置
 6     if (++putIndex == items.length)
 7         putIndex = 0;  //如果已经满了,重新将下一个放入位置指向0位置
 8     count++;
 9     notEmpty.signal();
10 }

 因为在每一次调用enqueue方法之前都会加锁并判断队列是否满了,只有当队列不满的时候才会执行该方法,所以这里可以直接将x放入到putIndex的位置,putIndex用于指示可以放入元素的索引,并且队列满了之后可以直接将其指向0位置,这是ArrayBlockingQueue最重要的特性之一:队列实现中的数组会被循环利用,当放满之后,前面的位置如果空下来了,又会将putIndex指向0,在前面的位置依次放数据。

 1 private E dequeue() {  //出队方法的具体实现
 2     // assert lock.getHoldCount() == 1;
 3     // assert items[takeIndex] != null;
 4     final Object[] items = this.items;
 5     @SuppressWarnings("unchecked")
 6     E x = (E) items[takeIndex];
 7     items[takeIndex] = null;
 8     if (++takeIndex == items.length)
 9         takeIndex = 0;
10     count--;
11     if (itrs != null)
12         itrs.elementDequeued(); //通知所有迭代器做相关清理工作
13     notFull.signal();
14     return x;
15 }

 dequeue是出队方法pool、take的具体实现,与enqueue同理,这里可以放心的将takeIndex位置的元素取出,takeIndex用于指示下一次将拿数据的索引位置,每一次拿到一个数据都会将其指向下一个位置,并且在取得队尾时,才将指向对列头部,所以不要担心读取顺序的问题,它总是按照入队的顺序严格先进先出的。只不过执行入队操作的线程到底谁先执行确实由公平策略控制的。这里也体现了数组的循环利用,takeIndex指向的是队列的头(注意不是指数组的0索引位置),即处于队列中时间最长的那个元素,当依次取到数组结尾的时候,又会将其指向数组的0索引位置。

另一个特殊的方法就是removeAt方法(另一个remove方法最终也是调的该方法),因为它不向take、poll是移除头节点,而有可能是任意位置:

 1 void removeAt(final int removeIndex) {
 2     // assert lock.getHoldCount() == 1;
 3     // assert items[removeIndex] != null;
 4     // assert removeIndex >= 0 && removeIndex < items.length;
 5     final Object[] items = this.items;
 6     if (removeIndex == takeIndex) {//如果刚好是头节点,逻辑和take一样。
 7         // removing front item; just advance
 8         items[takeIndex] = null;
 9         if (++takeIndex == items.length)
10             takeIndex = 0;
11         count--;
12         if (itrs != null)
13             itrs.elementDequeued();//通知所有迭代器头节点出队
14     } else {
15         // an "interior" remove
16         //是一个内部节点的移除,那么需要将被删除节点后面所有有效节点往前挪。
17         // slide over all others up through putIndex.
18         final int putIndex = this.putIndex;
19         for (int i = removeIndex;;) {
20             int next = i + 1;
21             if (next == items.length)
22                 next = 0; //移除的是数组尾节点,next指向0
23             if (next != putIndex) { //如果移除节点与下一个放元素的节点之前有节点,那么就要把那些节点往前挪。
24                 items[i] = items[next];
25                 i = next;
26             } else { //直到挪到最后一个位置即putIndex,
27                 items[i] = null;
28                 this.putIndex = i; //将putIndex向前移动一个位置
29                 break;
30             }
31         }
32         count--;
33         if (itrs != null)
34             itrs.removedAt(removeIndex);//通过所有迭代器有内部节点出队
35     }
36     notFull.signal();
37 }

 removeAt方法的关键就在于如果移除的是头节点,那么同dequeue逻辑一样,只需要改变takeIndex即可,但如果移除的是中间的节点那么需要将被移除节点后面至可放入索引之间的节点依次往前挪

Iterator迭代器

ArrayBlockingQueue中对队列的操作源码非常简单,就不一一列举了,下面分析一下它的迭代器,由iterator方法可以看到它由内部类Itr实现:

public Iterator<E> iterator() {
    return new Itr();
}

由此可见,一个 ArrayBlockingQueue实例可见创建多个迭代器实例,每次调用iterator方法都是生成的全新的迭代器。那么既然可以创建多个迭代器,怎么管理这些迭代器使其能够在队列更新的时同步更新呢?不着急,这在创建迭代器实例的构造方法中就可以找到它的蛛丝马迹:

Java同步数据结构之ArrayBlockingQueue
 1 private class Itr implements Iterator<E> {
 2 
 3     /** Index to look for new nextItem; NONE at end */
 4     private int cursor; //指向下一个迭代元素的游标,可以循环最大后归0后继续增大,到达结束时为NONE对象即-1。
 5     //就是用于获得下一次调用next的返回对象。
 6 
 7     
 8     /** Element to be returned by next call to next(); null if none */
 9     private E nextItem; //下一个将拿到的元素,即调用Iterator.next()方法的返回值,没有元素则为null
10 
11     /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
12     private int nextIndex;//nextItem 下一个元素的索引,如果没有则为-1,如果已经通过其他方式删除了则为-2.
13 
14     /** Last element returned; null if none or not detached. */
15     private E lastItem; //上一次调用Iterator.next()返回的元素,没有或者不是“分离模式”就是null,
16     //lastItem是为了处理在hasNext()返回false之后调用iterator.remove()的极端情况删除预期的元素,
17       确保不会删除不该删除的元素,当然在这种其实迭代器已经处于分离模式下,
18       有可能还会发生其他内部元素的交叉删除而移动位置,有可能无法删除预期的元素,但是绝不会删除不该删除的元素。
19 
20     /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
21     private int lastRet;//lastItem 即上一个调用Iterator.next()返回的元素的索引,为了在调用next之后,调用remove的时候删除正确的元素,如果没有则为-1,如果已经通过其他方式删除了则为-2.
22     
23 
24     /** Previous value of takeIndex, or DETACHED when detached */
25     private int prevTakeIndex; //上一个takeIndex值,当detached时为-3.
26 
27     /** Previous value of iters.cycles */
28     private int prevCycles; //上一个cycles值。
29 
30     
31     /** Special index value indicating "not available" or "undefined" */
32     private static final int NONE = -1; //指示不可用或未定义的特殊索引值。
33 
34     /**
35      * Special index value indicating "removed elsewhere", that is,
36      * removed by some operation other than a call to this.remove().
37      */
38     private static final int REMOVED = -2;  //特殊的索引值,指示“已在别处删除”,即已经通过调用除了this.remove()之外的其他操作删除了。
39 
40     /** Special value for prevTakeIndex indicating "detached mode" */
41     private static final int DETACHED = -3; //指示“分离模式”prevTakeIndex特殊值。
42 
43     Itr() {
44         // assert lock.getHoldCount() == 0;
45         lastRet = NONE;
46         final ReentrantLock lock = ArrayBlockingQueue.this.lock;
47         lock.lock();
48         try {
49             if (count == 0) { //队列为空
50                 // assert itrs == null;
51                 cursor = NONE;
52                 nextIndex = NONE;
53                 prevTakeIndex = DETACHED;
54             } else {
55                 //takeIndex 指向的是下一个将被获取元素的索引,因为每一次获取元素之后,takeIndex都会被加1,指向下一处。
56                 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
57                 prevTakeIndex = takeIndex; //保存一下
58                 nextItem = itemAt(nextIndex = takeIndex);
59                 //nextItem就是takeIndex指向的下一个将会被获取的元素,也是下一次调用迭代器next时要返回的元素。
60                 //这里在初始化的时候都已经拿到了调用next将会返回的元素,所以即使在获取迭代器实例之后,在调用next之前该元素被移除队列,迭代器在调用next时依然会返回该元素。这里这样做的原因是为了避免在调用hasNext返回true报告有元素之后,在调用next之前由于出队操作导致元素被移除从而调用next的时候没有元素返回这种情况。
61                 
62                 //游标 指向的 takeIndex+1,如果该值已经到达队尾(但是队列不为空)则cursor为0,如果队列为空则-1.
63                 cursor = incCursor(takeIndex); 
64                 if (itrs == null) {
65                     itrs = new Itrs(this); //初始化迭代器列表维护器。
66                 } else {
67                     itrs.register(this); // 注册到迭代器链表中。
68                     itrs.doSomeSweeping(false); //清理过时的迭代器。
69                 }
70                 prevCycles = itrs.cycles;
71                 // assert takeIndex >= 0;
72                 // assert prevTakeIndex == takeIndex;
73                 // assert nextIndex >= 0;
74                 // assert nextItem != null;
75             }
76         } finally {
77             lock.unlock();
78         }
79     }
80     
81     //指示当前迭代器用完了,即可分离了。
82     boolean isDetached() {
83         // assert lock.getHoldCount() == 1;
84         return prevTakeIndex < 0;
85     }
86     //增加游标值,指向下一个takeIndex
87     private int incCursor(int index) {
88         // assert lock.getHoldCount() == 1;
89         if (++index == items.length)
90             index = 0;
91         if (index == putIndex) //到了下一个该入队元素的位置表示已经没有元素了。
92          //注意这里的 putIndex 会随着队列出入队操作不断变化,所以迭代器也会跟着变化
93             index = NONE;
94         return index;
95     }
View Code

相关文章:

  • 2021-12-26
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-05-11
  • 2021-12-21
  • 2021-11-20
  • 2022-12-23
猜你喜欢
  • 2021-08-14
  • 2022-01-11
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案