AbstractQueuedSynchronizer
之前在写《ThreadPoolExecutor运行原理》的时候,有计划写一个关于Java并发同步的文章,不久前刚好看到 Java并发之AQS详解,对于Java 并发控制核心的 AQS 类,做了很好的解析。本着好记性不如烂笔头的原则,我将全文逐一做了摘抄,并对照着对应的源代码,从新对 AQS 相关的知识点做了一次梳理。
AbstractQueuedSynchronizer
是 Java 并发控制中很重要也是很复杂的一个类,为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。它通过维护一个volatile int state
表示共享资源,一个FIFO线程等待队列,多线程争用资源被堵塞时会进入此队列。
1
2
3
getState()
setState()
compareAndSetState()
上面三个方法用于管理代表共享也是竞争的资源的state
,自定义同步器在实现的时候,需要实现共享资源state
的获取与释放方式,并定义哪种状态对于此对象意味着被获取或被释放。线程等待队列的维护,包括获取资源失败进入队列和唤醒出队,在AQS中都实现好了。
AQS 默认支持两种模式:
exclusive
独占,只有一个线程能执行,比如 ReentrantLock
share
共享,多项额线程可同时执行,比如 CountDownLatch
两种模式,对应下面的几个核心的方法
isHeldExclusively()
: 该线程是否正在独占资源,用到condition才需要去实现 tryAcquire(int)
:独占方式,尝试获取资源,成功返回true,失败返回false tryRelease(int)
:独占方式,尝试释放资源,成功则返回true,失败则返回false tryAcquireShared(int)
:共享方式,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源 tryReleaseShared(int)
:共享方式,尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false。
ReentrantLock
,state
初始化为0,表示未锁定状态。A线程lock的时候,会调用tryAcquire
独占该锁并将state+1
。此后其他线程再tryAcquire()
时候就会失败。直到A线程unlock()
到state=0
,即释放锁,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state
会累加),这就是可重入的概念,但是要注意,获取多少次就要释放多少次,这样才能保证state
是能回到0态的。
再以CountDownLatch
为例,任务分成N个子任务去执行,state
也初始化为N(N和线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()
一次,state
会CAS减1,等到所有子线程都执行完后(sate=0)会unpark
主调用线程,然后主调用线程会从await()
函数返回,继续后续动作。
acquire(int)
: 此方法是独占模式下线程获取共享资源的顶层入口,一般要通过tryAcquire
来实现。如果获取到资源,线程直接返回。否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响,这也正是lock的语义。当然不仅仅只局限于lock()
。获取到资源后,线程就可以去执行其临界区代码了。
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 调用自定义同步器的
tryAcquire()
尝试直接取获取资源,如果成功则直接返回; - 没有成功,则
addWaiter()
将该线程加入等待队列的尾部,并标记为独占模式 acquireQueued()
使线程在等待队列中休息,有机会时(被unpark
会去尝试获取资源)获取到资源后才返回。如果在整个等待过程中被中断返回true。否则返回false- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断(
selfInterrupt
),将中断补上
tryAcquire(int)
: 此方法尝试获取独占资源,如果获取成功,则直接返回true,否则直接返回false。这是trylock的语义。对于ReentrantLock.lock()
的流程,就是一个AQS.acqure(1)
。
1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
AQS只是一个框架,具体资源的获取、释放方式交由自定义同步器去实现,因此这里制定一个接口。具体需要自定义同步器通过对state
的get/set/CAS
去实现。能不能重入,能不能加塞(公平和非公平),也需要看具体的实现。自定义同步器在进行资源访问的时候需要考虑线程安全的影响。
addWaiter(Node)
: 此方法用于将当前线程加入到等待队列的队尾
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
// 以给定模式构造节点,EXCLUSIVE和SHARED
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 将节点加入到队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node
结点是对每一个访问同步代码的线程的抽象,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus
则表示当前被封装成Node
结点的等待状态,共有4种取值CANCELLED
、SIGNAL
、CONDITION
、PROPAGATE
。
CANCELLED
:值为1
,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus
为CANCELLED
,即结束状态,进入该状态后的结点将不会再变化。SIGNAL
:值为-1
,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。也就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL
状态的后继结点的线程执行。CONDITION
:值为-2
,与Condition
相关,该标识的结点处于等待队列中,结点的线程等待在Condition
上,当其他线程调用了Condition
的signal()
方法后,CONDITION
状态的结点将从等待队列转移到同步队列中,等待获取同步锁。PROPAGATE
:值为-3
,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。0
状态:值为0
,代表初始化状态。
AQS在判断状态时,通过用waitStatus>0
表示取消状态,而waitStatus<0
表示有效状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**CAS的方式,加入到队列中**/
private Node enq(final Node node) {
// 自旋
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 头为空,初始化一个Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 放入到队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
CAS自旋改变volatile变量。
通过tryAcquire
和addWaiter
,该线程获取资源失败,已经被加入到队列尾部,下一步就要进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,进行后续操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 前驱就是head,也就没有没有其他等待的节点,尝试获取资源
if (p == head && tryAcquire(arg)) {
// 获取成功,将头节点设置为当前节点
setHead(node);
// 将原来的前驱的后继设置为null ,便于gc
p.next = null; // help GC
failed = false;
// 返回没有中断
return interrupted;
}
// 就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// parkAndCheckInterrupt 的返回值为 Thread.interrupted(),表示当前线程是否被中断过
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ws == Node.SIGNAL (-1)
// 处于等待唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
// 此方法主要用于检查状态,看看自己是否真的可以去休息。如果前驱节点不是SIGNAL,那么自己就不能安心休息,需要再次尝试。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到前驱的状态
int ws = pred.waitStatus;
// 如果已经告诉前驱拿完号后,通知自己,就可以进入休息状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/**
如果前驱放弃,那就一直向前找,直到找到最近一个正常等待状态的节点,并排在它的后边
**/
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
/**存疑问?为什么设置前驱为SIGNAL**/
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 被唤醒后,检查自己是不是被中断过
}
park()
会让当前线程进入waiting状态,在此状态下有两种路径可以唤醒该线程:unpark
和interrupt
acquireQueued
整体流程如下
- 节点进入队尾,检查状态,找到安全休息点
- 调用
park()
进入waiting状态,等待unpark()
或interrupt()
唤醒自己 - 被唤醒后,看自己是不是有资格获取资源,如果拿到,head指向当前节点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
release(int)
: acquire的反向操作,此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放(即state=0),它会唤醒等待队列里的其他线程来获取资源。也就是unlock
的语义。
1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用tryRelease()
来释放资源
1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
和tryAcquire()
一样,这个方法是需要独占模式的自定义同步器去实现。正常来说,tryRelease
都会成功,因为这是独占模式,该线程来释放资源,它肯定拿到了独占资源,直接减掉对应的资源即可(state -=arg
),也不需要考虑线程安全的问题,但要注意它的返回值,release
根据tryRelease
的返回值来判断该线程是否已经完成资源释放了。在自定义同步器中,如果已经彻底释放资源(state = 0
),要返回true,否则返回false
unparkSuccessor(Node)
: 此方法用于唤醒等待队列中的下一个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
node 一般为当前线程所在的节点
*/
int ws = node.waitStatus;
if (ws < 0)
// 置零当前线程所在节点状态
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 找到下一个需要唤醒的节点s
if (s == null || s.waitStatus > 0) {
// 如果为空或者已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)// <=0 的节点,都还是有效的节点
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);// 唤醒线程
}
简单地说,就是用unpark()
唤醒等待队列中最前边的那个未放弃的线程。 这里用s
来表示这个线程。再和acquireQueued
联系起来,s
被唤醒后,进入if(p == head && tryAcquire(arg))
的判断(即使p!= head
也没有关系,它会再次进入 shouldParkFailedAcquire()
寻找一个安全点。这里既然s
已经是等待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire()
的调整,s
也必然会跑到head
的next
节点,下一次自旋的时候,(p == head
就成立了)
1
2
3
4
5
6
7
8
9
10
11
12
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
acquireShared(int)
:在共享模式下线程获取共享资源的顶层入口,会获取指定数量的资源,获取成功则直接返回。获取失败则进入等待队列。知道获取资源为止。整个过程忽略中断
1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared
需要自定义同步器去实现,AQS定义好了返回值的语义:
负数代表获取失败,将当前节点加入到队列;
0代表获取成功但没有剩余资源;
正数表示获取成功,还有剩余资源,其他线程还可以去获取。
tryAcquireShared
尝试获取资源,成功则直接返回- 失败通过
doAcquireShared
进入等待队列,直到获取资源为止才返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireShared(int arg) {
// 未当前节点创建一个共享类型的节点,并加入到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果是第二个Node,也就是前驱是head则尝试获取资源
int r = tryAcquireShared(arg);
if (r >= 0) {
// head指向自己,r为当前线程获取资源后的剩余值,也会尝试唤醒其他的后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断状态,寻找安全点 堵塞当前的线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
和acquireQueued
很类似。这里只有当线程是 head.next
,也就是第二个Node
的时候,才会尝试获取资源,如果有剩余的话还会唤醒后续的节点。假设存在这样的场景
head
释放了5个资源head.next
需要6个资源,老三需要1个,老四需要2个。老大唤醒了老二,但是老二资源不够,但是老二并不会把资源让给老三,而是继续park等待其他线程释放资源。AQS保证严格的按照FIFO的顺序唤醒,保证了公平但是降低了并发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
如果还有资源,尝试唤醒下一个节点。
acquireShared
和acquire
的区别在于,多了自己拿到资源后,还会唤醒后继节点的操作。这是共享的内涵。允许多个线程并发和同时执行。
1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// 尝试释放资源
if (tryReleaseShared(arg)) {
// 唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
releaseShared
是共享模式下,线程释放共享资源的顶层入口。它会释放指定数量的资源,如果成功且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
这里的逻辑和独占模式也很接近,不同的是,独占模式tryRelease
在完全释放掉资源state=0
后,才会返回true唤醒其他线程。这主要是基于独立可充入的考虑。而共享模式本质上是控制一定数量的线程并发执行。用于资源的线程在释放掉部分资源的时候,就可以唤醒后继等待节点。线程也不一定要释放自己有用的所有资源,而是可以释放部分资源。
ReentrantReadWriteLock
读锁,的tryReleaseShared
只有在完全释放资源才返回ture,这个自定义同步器可以根据自己的需要定义对应的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒后继节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
同步类在使用AQS的时候,一般自定义Sync
为内部类,供自己使用。对内的接口依赖于Sync
,Sync
只用实现资源state
的获取-释放,线程的排队,等待,唤醒,都在上层的AQS中实现好了。
tryAcquire
和tryRelease
就是使用和掌握AQS的核心。