接下来继续BlockingQueue的另一个实现,优先级阻塞队列PriorityBlockingQueue。PriorityBlockingQueue是一个无限容量的阻塞队列,由于容量是无限的所以put等入队操作其实不存在阻塞,只要内存足够都能够立即入队成功,当然多个入队操作的线程之间还是存在竞争唯一锁的互斥访问。虽然PriorityBlockingQueue逻辑上是无界的,但是尝试添加元素时还是可能因为资源耗尽而抛出OutOfMemoryError。
该队列也不允许放入null值,它使用与类java.util.PriorityQueue 相同的排序规则,也不允许放入不可比较的对象,这样做会导致ClassCastException。
值得注意的是,虽然PriorityBlockingQueue叫优先级队列,但是并不是说元素一入队就会按照排序规则被排好序,而是只有通过调用take、poll方法出队或者drainTo转移出的队列顺序才是被优先级队列排过序的。所以通过调用 iterator() 以及可拆分迭代器 spliterator() 方法返回的迭代器迭代的元素顺序都没有被排序。如果需要有序遍历可以通过 Arrays.sort(pq.toArray()) 方法来排序。注意peek方法永远只获取且不删除第一个元素,所以多次调用peek都是返回同样的值。
PriorityBlockingQueue其实是通过Comparator来排序的,要么入队的元素实现了Comparator接口(即所谓的自然排序),要么构造PriorityBlockingQueue实例的时候传入一个统一的Comparator实例,如果两者兼备那么以后者为准。PriorityBlockingQueue不保证具有相同优先级的元素顺序,但是你可以定义自定义类或比较器,通过辅助属性来决定优先级相同的元素的顺序,后文会举例说明。
PriorityBlockingQueue的实现使用了基于数组的平衡二叉堆(小的在上,大的在下方,即最小堆),使用单个ReentrantLock锁来保护公共操作,它虽然被设计成无界的,但是初始容量只有11,会随着元素的入队空间不够用再扩容,在扩容的时候使用了一个单独的自旋锁而不需要一直占用ReentrantLock锁,以保证消费线程的take等出队操作能够同时进行,避免了消费线程的反复延迟和随之而来的其它入队操作的元素积累,该类虽然有用到java.util.PriorityQueue,但是只是用于本队列的序列号与反序列化,以兼容老版本的实现,但是这种兼容性的维护将以开销瞬间翻倍为代价。所以没有学习过java.util.PriorityQueue也不影响对PriorityBlockingQueue的理解。
在队列不为空的任意时刻,PriorityBlockingQueue只保证队列中的第一个节点即head是当前最小或者是最高优先级的节点,peek、take、poll都是直接返回该head,至于head出队之后谁最小或者说谁最该被优先出队是由take或者poll方法在返回之前通过平衡二叉堆算法排序得出的,这个节点有可能就是队列中的某一个,也可能是在这段时间之内新入队的一个具有更高优先级的节点。
源码解析
先来看看PriorityBlockingQueue的成员属性:
1 /** 2 * Default array capacity. 默认的数组容量。 3 */ 4 private static final int DEFAULT_INITIAL_CAPACITY = 11; 5 6 /** 7 * The maximum size of array to allocate. 要分配的数组的最大大小。 8 * Some VMs reserve some header words in an array. 一些vm在数组中保留一些头信息 9 * Attempts to allocate larger arrays may result in 10 * OutOfMemoryError: Requested array size exceeds VM limit 11 尝试分配更大的数组可能会导致OutOfMemoryError:请求的数组大小超过VM限制 12 */ 13 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 14 15 /** 16 * Priority queue represented as a balanced binary heap: the two 17 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 18 * priority queue is ordered by comparator, or by the elements' 19 * natural ordering, if comparator is null: For each node n in the 20 * heap and each descendant d of n, n <= d. The element with the 21 lowest value is in queue[0], assuming the queue is nonempty. 22 23 优先级队列表现为一个平衡的二进制堆:queue[n]的两个子队列是queue[2*n+1]和queue[2*(n+1)]。 24 25 优先级队列通过comparator或元素的自然顺序排序,如果comparator为null:对于堆中的每个节点n和n的每个子节点d, n <= d。假设队列非空,则queue[0]中的值最小。 26 27 */ 28 private transient Object[] queue; 29 30 /** 31 * The number of elements in the priority queue. 优先级队列中的元素个数。 32 */ 33 private transient int size; 34 35 /** 36 * The comparator, or null if priority queue uses elements' 37 * natural ordering. 比较器,如果优先级队列使用元素的自然顺序排序,则为null。 38 */ 39 private transient Comparator<? super E> comparator; 40 41 /** 42 * Lock used for all public operations 用于所有公共操作的锁 43 */ 44 private final ReentrantLock lock; 45 46 /** 47 * Condition for blocking when empty 48 */ 49 private final Condition notEmpty; 50 51 /** 52 * Spinlock for allocation, acquired via CAS. 用于分配的自旋锁,通过CAS获取。 53 */ 54 private transient volatile int allocationSpinLock; 55 56 /** 57 * A plain PriorityQueue used only for serialization, 58 * to maintain compatibility with previous versions 59 * of this class. Non-null only during serialization/deserialization. 60 一个普通的PriorityQueue,仅用于序列化,以保持与该类以前版本的兼容性。 61 仅在序列化/反序列化期间非空。 62 */ 63 private PriorityQueue<E> q;