接下来继续BlockingQueue的另一个实现,优先级阻塞队列PriorityBlockingQueue。PriorityBlockingQueue是一个无限容量的阻塞队列,由于容量是无限的所以put等入队操作其实不存在阻塞,只要内存足够都能够立即入队成功,当然多个入队操作的线程之间还是存在竞争唯一锁的互斥访问。虽然PriorityBlockingQueue逻辑上是无界的,但是尝试添加元素时还是可能因为资源耗尽而抛出OutOfMemoryError。

该队列也不允许放入null值,它使用与类java.util.PriorityQueue 相同的排序规则,也不允许放入不可比较的对象,这样做会导致ClassCastException。

值得注意的是,虽然PriorityBlockingQueue叫优先级队列,但是并不是说元素一入队就会按照排序规则被排好序,而是只有通过调用take、poll方法出队或者drainTo转移出的队列顺序才是被优先级队列排过序的。所以通过调用 iterator() 以及可拆分迭代器 spliterator() 方法返回的迭代器迭代的元素顺序都没有被排序。如果需要有序遍历可以通过 Arrays.sort(pq.toArray()) 方法来排序。注意peek方法永远只获取且不删除第一个元素,所以多次调用peek都是返回同样的值。

PriorityBlockingQueue其实是通过Comparator来排序的,要么入队的元素实现了Comparator接口(即所谓的自然排序),要么构造PriorityBlockingQueue实例的时候传入一个统一的Comparator实例,如果两者兼备那么以后者为准。PriorityBlockingQueue不保证具有相同优先级的元素顺序,但是你可以定义自定义类或比较器,通过辅助属性来决定优先级相同的元素的顺序,后文会举例说明。

PriorityBlockingQueue的实现使用了基于数组的平衡二叉堆(小的在上,大的在下方,即最小堆),使用单个ReentrantLock锁来保护公共操作,它虽然被设计成无界的,但是初始容量只有11,会随着元素的入队空间不够用再扩容,在扩容的时候使用了一个单独的自旋锁而不需要一直占用ReentrantLock锁,以保证消费线程的take等出队操作能够同时进行,避免了消费线程的反复延迟和随之而来的其它入队操作的元素积累,该类虽然有用到java.util.PriorityQueue,但是只是用于本队列的序列号与反序列化,以兼容老版本的实现,但是这种兼容性的维护将以开销瞬间翻倍为代价。所以没有学习过java.util.PriorityQueue也不影响对PriorityBlockingQueue的理解。

在队列不为空的任意时刻,PriorityBlockingQueue只保证队列中的第一个节点即head是当前最小或者是最高优先级的节点,peek、take、poll都是直接返回该head,至于head出队之后谁最小或者说谁最该被优先出队是由take或者poll方法在返回之前通过平衡二叉堆算法排序得出的,这个节点有可能就是队列中的某一个,也可能是在这段时间之内新入队的一个具有更高优先级的节点。

源码解析 

先来看看PriorityBlockingQueue的成员属性:

Java同步数据结构之PriorityBlockingQueue
 1 /**
 2  * Default array capacity. 默认的数组容量。
 3  */
 4 private static final int DEFAULT_INITIAL_CAPACITY = 11;
 5 
 6 /**
 7  * The maximum size of array to allocate. 要分配的数组的最大大小。
 8  * Some VMs reserve some header words in an array. 一些vm在数组中保留一些头信息
 9  * Attempts to allocate larger arrays may result in
10  * OutOfMemoryError: Requested array size exceeds VM limit
11    尝试分配更大的数组可能会导致OutOfMemoryError:请求的数组大小超过VM限制
12  */
13 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
14 
15 /**
16  * Priority queue represented as a balanced binary heap: the two
17  * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
18  * priority queue is ordered by comparator, or by the elements'
19  * natural ordering, if comparator is null: For each node n in the
20  * heap and each descendant d of n, n <= d.  The element with the
21    lowest value is in queue[0], assuming the queue is nonempty.
22  
23    优先级队列表现为一个平衡的二进制堆:queue[n]的两个子队列是queue[2*n+1]和queue[2*(n+1)]。
24    
25    优先级队列通过comparator或元素的自然顺序排序,如果comparator为null:对于堆中的每个节点n和n的每个子节点d, n <= d。假设队列非空,则queue[0]中的值最小。
26  
27  */
28 private transient Object[] queue;
29 
30 /**
31  * The number of elements in the priority queue. 优先级队列中的元素个数。
32  */
33 private transient int size;
34 
35 /**
36  * The comparator, or null if priority queue uses elements'
37  * natural ordering. 比较器,如果优先级队列使用元素的自然顺序排序,则为null。
38  */
39 private transient Comparator<? super E> comparator;
40 
41 /**
42  * Lock used for all public operations 用于所有公共操作的锁
43  */
44 private final ReentrantLock lock;
45 
46 /**
47  * Condition for blocking when empty
48  */
49 private final Condition notEmpty;
50 
51 /**
52  * Spinlock for allocation, acquired via CAS. 用于分配的自旋锁,通过CAS获取。
53  */
54 private transient volatile int allocationSpinLock;
55 
56 /**
57  * A plain PriorityQueue used only for serialization,
58  * to maintain compatibility with previous versions
59  * of this class. Non-null only during serialization/deserialization.
60    一个普通的PriorityQueue,仅用于序列化,以保持与该类以前版本的兼容性。
61    仅在序列化/反序列化期间非空。
62  */
63 private PriorityQueue<E> q;
View Code

相关文章:

  • 2021-12-26
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-05-11
  • 2021-12-21
  • 2021-11-20
  • 2022-12-23
猜你喜欢
  • 2021-08-14
  • 2022-01-11
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案