Java并发编程笔记之ConcurrentLinkedQueue源码探究
JDK 中基于链表的⾮阻塞⽆界队列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 内部是如何使⽤ CAS ⾮阻塞算法来保证多线程下⼊队出队操作的线程安全?
ConcurrentLinkedQueue是线程安全的⽆界⾮阻塞队列,其底层数据结构是使⽤单向链表实现,⼊队和出队操作是使⽤我们经常提到的CAS来保证线程安全的。
我们⾸先看⼀下ConcurrentLinkedQueue的类图结构先,好有⼀个内部逻辑有⼀个⼤概的印象,如下图所⽰:
可以清楚的看到ConcurrentLinkedQueue内部的队列是使⽤单向链表⽅式实现,类中两个volatile 类型的Node 节点分别⽤来存放队列的⾸位节点。
⾸先我们先来看⼀下ConcurrentLinkedQueue的构造函数,如下:java笔记总结
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
通过⽆参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
Node节点内部则维护⼀个volatile 修饰的变量item ⽤来存放节点的值,next⽤来存放链表的下⼀个节点,从⽽链接为⼀个单向⽆界链表,这就是单向⽆界的根本原因。如下图:
接下来看ConcurrentLinkedQueue 主要关注⼊队,出队,获取队列元素的⽅法的源码,如下所⽰:
1.⾸先看⼊队⽅法offer,offer 操作是在队列末尾添加⼀个元素,如果传递的参数是 null 则抛出 NPE 异常,否者由于 ConcurrentLinkedQueue 是⽆界队列该⽅法⼀直会返回 true。另外由于使⽤ CAS ⽆阻塞算法,该⽅法不会阻塞调⽤线程,其源码如下:
public boolean offer(E e) {
//(1)e为null则抛出空指针异常
checkNotNull(e);
//(2)构造Node节点
final Node<E> newNode = new Node<E>(e);
//(3)从尾节点进⾏插⼊
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
//(4)如果q==null说明p是尾节点,则执⾏插⼊
if (q == null) {
//(5)使⽤CAS设置p节点的next节点
if (p.casNext(null, newNode)) {
//(6)cas成功,则说明新增节点已经被放⼊链表,然后设置当前尾节点
if (p != t)
casTail(t, newNode);  // Failure is OK.
return true;
}
}
else if (p == q)//(7)
//多线程操作时候,由于poll操作移除元素后有可能会把head变为⾃引⽤,然后head的next变为新head,所以这⾥需要
//重新新的head,因为新的head后⾯的节点才是正常的节点。
p = (t != (t = tail)) ? t : head;
else
//(8)寻尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
类图结构时候谈到构造队列时候参构造函数创建了⼀个 item 为 null 的哨兵节点,并且 head 和 tail 都是指向这个节点,下⾯通过图形结合来讲解下 offer 操作的代码实现。
  1.⾸先看⼀下,当⼀个线程调⽤offer(item)时候情况:⾸先代码(1)对传参判断空检查,如果为null 则抛出空指针异常,然后代码(2)则使⽤item 作为构造函数参数创建⼀个新的节点,
代码(3)从队列尾部节点开始循环,⽬的是从队列尾部添加元素。如下图:
上图是执⾏代码(4)时候队列的情况,这时候节点 p , t ,head ,tail 同时指向了item为null的哨兵节点,由于哨兵节点的next节点为null,所以这⾥q指向也是null。
代码(4)发现q==null  则执⾏代码(5),通过CAS原⼦操作判断p 节点的next节点是否为null,如果为null 则使⽤节点newNode替换p 的next节点,
然后执⾏代码(6),由于 p == t ,所以没有设置尾部节点,然后退出offer⽅法,这时候队列的状态图如下:
上⾯讲解的是⼀个线程调⽤offer⽅法的情况下,如果多个线程同时调⽤,就会存在多个线程同时执⾏到代码(5),假设线程A调⽤offer(item1),
线程B调⽤offer(item2),线程 A 和线程B同时到 p.casNext(null,newNode)。⽽CAS的⽐较并设置操作是原⼦性的,假设线程A先执⾏了⽐较设置操作,
则发现当前P的next节点确实是null ,则会原⼦性更新next节点为newNode,这时候线程B 也会判断p 的
next节点是否为null,结果发现不是null,(因为线程 A 已经设置了 p 的 next 为 newNode)则会跳到代码(3),
然后执⾏到代码(4)的时候的队列分布图如下:
根据这个状态图可知线程B会执⾏代码(8),然后q 赋值给了p,这个时候状态图为:
然后线程B再次跳转到代码(3)执⾏,当执⾏到代码(4)时候队列状态图为:
由于这时候q == null ,所以线程B 会执⾏步骤(5),通过CAS操作判断当前p的next 节点是否为null ,不是则再次循环后尝试,是则使⽤newNode替换,假设CAS成功了,那么执⾏步骤(6),
由于 p != t 所以设置tail节点为newNode ,然后退出offer⽅法。这时候队列的状态图为:
到现在为⽌,offer代码在执⾏路径现在就差步骤(7)还没有执⾏过,其实这个要在执⾏poll操作才会出现的,这⾥先看⼀下执⾏poll操作后可能会存在的⼀种情况,如下图所⽰:
下⾯分析下当队列处于这种状态调⽤offer添加元素代码执⾏到代码(4)的时候的队列状态图,如下:
由于q节点不为空并且p==q 所以执⾏代码(7),因为 t == tail所以p 被赋值为head ,然后进⼊循环,循环后执⾏到代码(4)的时候的队列状态图,如下:
由于 q ==null,所以执⾏代码(5),进⾏CAS操作,如果当前没有其他线程执⾏offer操作,则CAS操作会成功,p的next节点被设置为新增节点,然后执⾏代码(6),
由于p != t 所以设置新节点为队列尾节点,现在队列状态图,如下:
在这⾥的⾃引⽤的节点会被垃圾回收掉,可见offer操作⾥⾯关键步骤是代码(5)通过原⼦CAS操作来进⾏控制同时只有⼀个线程可以追加元素到队列末尾,进⾏cas竞争失败的线程,
则会通过循环⼀次次尝试进⾏cas操作,知道cas成功才会返回,也就是通过使⽤⽆限循环⾥⾯不断进⾏CAS尝试⽅式来替代阻塞算法挂起调⽤线程,相⽐阻塞算法,这是使⽤CPU资源换取阻塞带来的开销。
  2.poll操作,poll 操作是在队列头部获取并且移除⼀个元素,如果队列为空则返回 null,我们⾸先看改⽅法的源码,如下:
public E poll() {
//(1) goto标记
restartFromHead:
//(2)⽆限循环
for (;;) {
for (Node<E> h = head, p = h, q;;) {
/
/(3)保存当前节点值
E item = p.item;
//(4)当前节点有值则cas变为null
if (item != null && p.casItem(item, null)) {
//(5)cas成功标志当前节点以及从链表中移除
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//(6)当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。