⾯试官突击⼀问:你来讲讲AQS是什么吧?都是怎么⽤的?前⾔
在Java⾯试的时候,多线程相关的知识是躲不掉的,肯定会被问。我就被问到了AQS的知识,就直接了当的问,AQS知道是什么吧,来讲讲它是怎么实现的,以及哪些地⽅⽤到了它。当时⾃⼰确实没有讲好,所以这次来总结⼀下这个知识点。
此外,这边还整理了包括但不限于:分布式架构、⾼可扩展、⾼性能、⾼并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点⾼级进阶⼲货,如若需要学习借鉴,可
什么是AQS
AQS全称是AbstractQueuedSynchronizer,形如其名,抽象队列同步器。 AQS定义了两种资源共享模式:独占式,每次只能有⼀个线程持有锁,例如ReentrantLock实现的就是独占式的锁资源。
共享式,允许多个线程同时获取锁,并发访问共享资源,ReentrantWriteLock和CountDownLatch等就是实现的这种模式。
它维护了⼀个volatile的state变量和⼀个FIFO(先进先出)的队列。 其中state变量代表的是竞争资源标识,⽽队列代表的是竞争资源失败的线程排队时存放的容器。
1
2public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
3 ...
4 /**
5 * The synchronization state.
6 */
7 private volatile int state;
8 /**
9 * Wait queue node class.
10 **/
11 static final class Node {
12 ...
13 }
14 ...
15}
16
AQS中提供了操作state的⽅法:
getState();
setState();
compareSetState();
1
2protected final int getState() {
3 return state;
4}
5protected final void setState(int newState) {
6 state = newState;
7}
8protected final boolean compareAndSetState(int expect, int update) {
9 // See below for intrinsics setup to support this
10 return unsafepareAndSwapInt(this, stateOffset, expect, update);
11}
12
因为AbstractQueuedSynchronizer是⼀个抽象类,他采⽤模板⽅法的设计模式,规定了独占和共享模式需要实现的⽅法,并且将⼀些通⽤的功能已经进⾏了实现,所以不同模式的使⽤⽅式,只需要⾃⼰定义好实现共享资源的获取与释放即可,⾄于具体线程在等待队列中的维护(获取资源⼊队列、唤醒出队列等),AQS已经实现好了。
所以根据共享资源的模式⼀般实现的⽅法有如下⼏个:
isHeldExclusively();// 是否为独占模式;但是只有使⽤到了Condition的,才需要去实现它。例如:ReentrantLock。
boolean tryAcquire(int arg); // 独占模式;尝试获取资源,成功返回true,失败返回false。
boolean tryRelease(int arg) ; // 独占模式;尝试释放资源,成功返回true,失败返回false。
int tryAcquireShared(int arg); // 共享模式;尝试获取资源,负数表⽰失败;0表⽰成功,但是没有剩余可⽤资源了;正数表⽰成功,且有剩余可⽤资源。
boolean tryReleaseShared(int arg) ; // 共享模式;尝试释放资源,若释放资源后允许唤醒后续等待节点返回true,否则返回false。
上⾯的这⼏个⽅法在AbstractQueuedSynchronizer这个抽象类中,都没有被定义为abstract的,说明这些⽅法都是可以按需实现的,共享模式下可以只实现共享模式的⽅法(例如CountDownLatch),独占模式下可以只实现独占模式的⽅法(例如ReentrantLock),也⽀持两种都实现,两种模式都使⽤(例如ReentrantReadWriteLock)。
AQS源码分析
我们先简单介绍AQS的两种模式的实现类的代表ReentrantLock(独占模式)和CountDownLatch(共享模式),是如何来共享资源的⼀个过程,然后再详细通过AQS的源码来分析整个实现过程。
ReentrantLock在初始化的时候state=0,表⽰资源未被锁定。当A线程执⾏lock()⽅法时,会调⽤tryAcquire()⽅法,将AQS中队列的模式设置为独占,并将独占线程设置为线程A,以及将state+1。 这样在线程A没有释放锁前,其他线程来竞争锁,调⽤
tryAcquire()⽅法时都会失败,然后竞争锁失败的线程就会进⼊到队列中。当线程A调⽤执⾏unlock()⽅法将state=0后,其他线程才有机会获取锁(注意ReentrantLock是可重⼊的,同⼀线程多次获取锁时state值会进⾏垒加的,在释放锁时也要释放相应的次数才算完全释放了锁)。
CountDownLatch会将任务分成N个⼦线程去执⾏,state的初始值也是N(state与⼦线程数量⼀致)。N个⼦线程是并⾏执⾏的,每个⼦线程执⾏完成后countDown()⼀次,state会通过CAS⽅式减1。直到所有⼦线程执⾏完成后(state=0),会通过unpark()⽅法唤醒主线程,然后主线程就会从await()⽅法返回,继续后续操作。
独占模式分析
在AbstractQueuedSynchronizer的类⾥⾯有⼀个静态内部类Node。它代表的是队列中的每⼀个节点。 其中Node节点有如下⼏个属性:
2// 节点的状态
3volatile int waitStatus;
4// 当前节点的前⼀个节点
5volatile Node prev;
6// 当前节点的后⼀个节点
7volatile Node next;
8// 当前节点中所包含的线程对象
9volatile Thread thread;
10// 等待队列中的下⼀个节点
11Node nextWaiter;
12
每个属性代表的什么,已经写在代码注释中了。其中Node类中还有⼏个常量,代表了⼏个节点的状态(waitStatus)值。
1
2/** waitStatus value to indicate thread has cancelled */
3 static final int CANCELLED = 1;
4 /** waitStatus value to indicate successor's thread needs unparking */
5 static final int SIGNAL = -1;
6 /** waitStatus value to indicate thread is waiting on condition */
7 static final int CONDITION = -2;
8 /**
9 * waitStatus value to indicate the next acquireShared should
10 * unconditionally propagate
11 */
12 static final int PROPAGATE = -3;
13
⾸先节点的状态值waitStatus默认是0,然后下⾯⼏个常量有⾃⼰具体的含义。 CANCELLED = 1; 代
表的是当前节点从同步队列中取消,当timeout或被中断(响应中断的情况下),会触发变更为此状态,进⼊该状态后的结点将不会再变化。 SIGNAL = -1; 代表后继节点处于等待状态。后继结点⼊队时,会将前继结点的状态更新为SIGNAL。 CONDITION = -2; 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调⽤了 signal()⽅法后,该节点将会从等待队列中转移到同步队列中,加⼊到对同步状态的获取中。 PROPAGATE = -3; 表⽰在共享模式下,前继节点在释放资源后会唤醒后继节点,并将这种共享模式传播下去。
通过上⾯⼏个固定的常量值,我们可以看出节点状态中通常负数值通常表⽰节点处于有效的等待状态,⽽正数值代表节点已经被取消了。
所以AQS源码中有很多地⽅都⽤waitStatus>0或waitStatus<0这种⽅式来判断队列中节点的是否正常。
独占模式下,只能有⼀个线程占有锁资源,其他竞争资源的线程,在竞争失败后都会进⼊到等待队列中,等待占有锁资源的线程释放锁,然后再重新被唤醒竞争资源。
ReentrantLock加锁过程
ReentrantLock默认是⾮公平锁,就是说,线程在竞争锁的时候并不是按照先来后到的顺序来获取锁的,但是ReentrantLock也是⽀持公平锁的,在创建的时候传⼊⼀个参数值即可。 下⾯我们以Reentra
ntLock默认情况下的加锁来分析AQS的源码。 ReentrantLock并没有直接继承AQS类,⽽是通过内部类来继承AQS类的,这样⾃⼰的实现功能,⾃⼰⽤。 我们在⽤ReentrantLock加锁的时候都是调⽤的⽤lock()⽅法,那么我们来看看默认⾮公平锁下,lock()⽅法的源码:
2 * Sync object for non-fair locks
3 */
4static final class NonfairSync extends Sync {
5 private static final long serialVersionUID = 7316153563782823691L;
6
7 /**
8 * Performs lock. Try immediate barge, backing up to normal
9 * acquire on failure.
10 */
11 final void lock() {
12 if (compareAndSetState(0, 1))
13 setExclusiveOwnerThread(Thread.currentThread());
14 else
15 acquire(1);
16 }
17
18 protected final boolean tryAcquire(int acquires) {
19 return nonfairTryAcquire(acquires);
20 }
21}
22
通过源码可以看到,lock()⽅法,⾸先是通过CAS的⽅式抢占锁,如果抢占成功则将state的值设置为1。然后将对象独占线程设置为当前线程。
1
2protected final void setExclusiveOwnerThread(Thread thread) {
3 exclusiveOwnerThread = thread;
4}
5
如果抢占锁失败,就会调⽤acquire()⽅法,这个acquire()⽅法的实现就是在AQS类中了,说明具体抢占锁失败后的逻辑,AQS已经规定好了模板。
1
2public final void acquire(int arg) {
3 if (!tryAcquire(arg) &&
4 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
5 selfInterrupt();
6}
7
上⾯已经介绍了,独占模式是需要实现tryAcquire()⽅法的,这⾥⾸先就是通过tryAcquire()⽅法抢占锁,如果成功返回true,失败返回false。tryAcquire()⽅法的具体实现,是在ReentrantLock⾥⾯的,AQS类中默认是直接抛出异常的。
⾸先获取state值,如果state值为0,说明⽆锁,那么通过CAS尝试加锁,成功后,将独占线程设置为当前线程。
如果state值不为0,并且当前的独占线程和当前线程为同⼀线程,那么state重⼊次数加1。
如果state值不为0,并且当前线程不是独占线程,直接返回false。
2protected final boolean tryAcquire(int acquires) {
3 return nonfairTryAcquire(acquires);
4}
5final boolean nonfairTryAcquire(int acquires) {
6 final Thread current = Thread.currentThread();
7 int c = getState();// 获取state值
8 if (c == 0) {
9 // 如果state值为0,说明⽆锁,那么就通过cas⽅式,尝试加锁,成功后将独占线程设置为当前线程
10 if (compareAndSetState(0, acquires)) {
11 setExclusiveOwnerThread(current);
12 return true;
13 }
14 }
15 else if (current == getExclusiveOwnerThread()) { // 如果是同⼀个线程再次来获取锁,那么就将state的值进⾏加1处理(可重⼊锁的,重⼊次数)。
16 int nextc = c + acquires;
17 if (nextc < 0) // overflow
18 throw new Error("Maximum lock count exceeded");
19 setState(nextc);
20 return true;
21 }
22 return false;
23 }update是什么
24
我们继续来看acquire()⽅法,在执⾏完tryAcquire()⽅法后,如果加锁失败那么就会执⾏addWaiter()⽅法和acquireQueued(),这两个⽅法的作⽤是将竞争锁失败的线程放⼊到等待队列中。
image
addWaiter()⽅法的源码如下:
1
2private Node addWaiter(Node mode) {
3 // ⽤参数指定的模式将当前线程封装成队列中的节点(EXCLUSIVE【独占】,SHARED【共享】)
4 Node node = new Node(Thread.currentThread(), mode);
5 // Try the fast path of enq; backup to full enq on failure
6 Node pred = tail;
7 // tail是队列的尾部节点,初始时队列为空,尾部节点为null,直接调⽤enq将节点插⼊队列
8 if (pred != null) {
9 // 将当前线程节点的前级节点指向队列的尾部节点。
10 node.prev = pred;
11 // 通过CAS⽅式将节点插⼊到队列中
12 if (compareAndSetTail(pred, node)) {
13 // 插⼊成功后,将原先的尾部节点的后级节点指向新的尾部节点
14 = node;
15 return node;
16 }
17 }
18 // 如果尾部节点为空或通过CAS插⼊队列失败则通过enq⽅法插⼊节点
19 enq(node);
20 return node;
21}
22
addWaiter()中主要做了三件事:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论