Post

AbstractQueuedSynchronizer

之前在写《ThreadPoolExecutor运行原理》的时候,有计划写一个关于Java并发同步的文章,不久前刚好看到 Java并发之AQS详解,对于Java 并发控制核心的 AQS 类,做了很好的解析。本着好记性不如烂笔头的原则,我将全文逐一做了摘抄,并对照着对应的源代码,从新对 AQS 相关的知识点做了一次梳理。

AbstractQueuedSynchronizer 是 Java 并发控制中很重要也是很复杂的一个类,为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。它通过维护一个volatile int state 表示共享资源,一个FIFO线程等待队列,多线程争用资源被堵塞时会进入此队列。

1
2
3
getState()
setState()
compareAndSetState()

上面三个方法用于管理代表共享也是竞争的资源的state,自定义同步器在实现的时候,需要实现共享资源state的获取与释放方式,并定义哪种状态对于此对象意味着被获取或被释放。线程等待队列的维护,包括获取资源失败进入队列和唤醒出队,在AQS中都实现好了。

AQS 默认支持两种模式:

exclusive 独占,只有一个线程能执行,比如 ReentrantLock share 共享,多项额线程可同时执行,比如 CountDownLatch

两种模式,对应下面的几个核心的方法

isHeldExclusively() : 该线程是否正在独占资源,用到condition才需要去实现 tryAcquire(int) :独占方式,尝试获取资源,成功返回true,失败返回false tryRelease(int) :独占方式,尝试释放资源,成功则返回true,失败则返回false tryAcquireShared(int):共享方式,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源 tryReleaseShared(int):共享方式,尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false。

ReentrantLockstate初始化为0,表示未锁定状态。A线程lock的时候,会调用tryAcquire独占该锁并将state+1。此后其他线程再tryAcquire()时候就会失败。直到A线程unlock()state=0,即释放锁,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念,但是要注意,获取多少次就要释放多少次,这样才能保证state是能回到0态的。

再以CountDownLatch为例,任务分成N个子任务去执行,state也初始化为N(N和线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1,等到所有子线程都执行完后(sate=0)会unpark主调用线程,然后主调用线程会从await()函数返回,继续后续动作。

acquire(int): 此方法是独占模式下线程获取共享资源的顶层入口,一般要通过tryAcquire来实现。如果获取到资源,线程直接返回。否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响,这也正是lock的语义。当然不仅仅只局限于lock()。获取到资源后,线程就可以去执行其临界区代码了。

1
2
3
4
5
public final void acquire(int arg) {
	if (!tryAcquire(arg) &&
		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}
  1. 调用自定义同步器的tryAcquire()尝试直接取获取资源,如果成功则直接返回;
  2. 没有成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式
  3. acquireQueued()使线程在等待队列中休息,有机会时(被unpark会去尝试获取资源)获取到资源后才返回。如果在整个等待过程中被中断返回true。否则返回false
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断(selfInterrupt),将中断补上

tryAcquire(int): 此方法尝试获取独占资源,如果获取成功,则直接返回true,否则直接返回false。这是trylock的语义。对于ReentrantLock.lock()的流程,就是一个AQS.acqure(1)

1
2
3
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

AQS只是一个框架,具体资源的获取、释放方式交由自定义同步器去实现,因此这里制定一个接口。具体需要自定义同步器通过对stateget/set/CAS去实现。能不能重入,能不能加塞(公平和非公平),也需要看具体的实现。自定义同步器在进行资源访问的时候需要考虑线程安全的影响。

addWaiter(Node): 此方法用于将当前线程加入到等待队列的队尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
	// 以给定模式构造节点,EXCLUSIVE和SHARED
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	// 将节点加入到队列中
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}

Node结点是对每一个访问同步代码的线程的抽象,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLEDSIGNALCONDITIONPROPAGATE

  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatusCANCELLED,即结束状态,进入该状态后的结点将不会再变化。

  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。也就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。

  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Conditionsignal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。

  • 0状态:值为0,代表初始化状态。

AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**CAS的方式,加入到队列中**/
private Node enq(final Node node) {
// 自旋
	for (;;) {
		Node t = tail;
		if (t == null) { // Must initialize
		// 头为空,初始化一个Node
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
		// 放入到队尾
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}

CAS自旋改变volatile变量。

通过tryAcquireaddWaiter,该线程获取资源失败,已经被加入到队列尾部,下一步就要进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,进行后续操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor();
			// 前驱就是head,也就没有没有其他等待的节点,尝试获取资源
			if (p == head && tryAcquire(arg)) {
			// 获取成功,将头节点设置为当前节点
				setHead(node);
				// 将原来的前驱的后继设置为null ,便于gc
				p.next = null; // help GC
				failed = false;
				// 返回没有中断
				return interrupted;
			}
			// 就进入waiting状态,直到被unpark()
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				// parkAndCheckInterrupt 的返回值为 Thread.interrupted(),表示当前线程是否被中断过
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

// ws == Node.SIGNAL (-1) 
// 处于等待唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
// 此方法主要用于检查状态,看看自己是否真的可以去休息。如果前驱节点不是SIGNAL,那么自己就不能安心休息,需要再次尝试。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到前驱的状态
	int ws = pred.waitStatus;
	// 如果已经告诉前驱拿完号后,通知自己,就可以进入休息状态
	if (ws == Node.SIGNAL)
		/*
		 * This node has already set status asking a release
		 * to signal it, so it can safely park.
		 */
		return true;
	if (ws > 0) {
	/**
	如果前驱放弃,那就一直向前找,直到找到最近一个正常等待状态的节点,并排在它的后边
	**/
		/*
		 * Predecessor was cancelled. Skip over predecessors and
		 * indicate retry.
		 */
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		/*
		 * waitStatus must be 0 or PROPAGATE.  Indicate that we
		 * need a signal, but don't park yet.  Caller will need to
		 * retry to make sure it cannot acquire before parking.
		 */
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	} 
	
	/**存疑问?为什么设置前驱为SIGNAL**/
	return false;
}

private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this); 
	return Thread.interrupted(); // 被唤醒后,检查自己是不是被中断过
}

park()会让当前线程进入waiting状态,在此状态下有两种路径可以唤醒该线程:unparkinterrupt

acquireQueued整体流程如下

  1. 节点进入队尾,检查状态,找到安全休息点
  2. 调用park()进入waiting状态,等待unpark()interrupt()唤醒自己
  3. 被唤醒后,看自己是不是有资格获取资源,如果拿到,head指向当前节点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
1
2
3
4
5
public final void acquire(int arg) {
	if (!tryAcquire(arg) &&
		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

release(int): acquire的反向操作,此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放(即state=0),它会唤醒等待队列里的其他线程来获取资源。也就是unlock的语义。

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
	if (tryRelease(arg)) {
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}

调用tryRelease()来释放资源

1
2
3
protected boolean tryRelease(int arg) {
	throw new UnsupportedOperationException();
}

tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现。正常来说,tryRelease都会成功,因为这是独占模式,该线程来释放资源,它肯定拿到了独占资源,直接减掉对应的资源即可(state -=arg),也不需要考虑线程安全的问题,但要注意它的返回值,release根据tryRelease的返回值来判断该线程是否已经完成资源释放了。在自定义同步器中,如果已经彻底释放资源(state = 0),要返回true,否则返回false

unparkSuccessor(Node): 此方法用于唤醒等待队列中的下一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void unparkSuccessor(Node node) {
	/*
	 * If status is negative (i.e., possibly needing signal) try
	 * to clear in anticipation of signalling.  It is OK if this
	 * fails or if status is changed by waiting thread.
	 
	 node 一般为当前线程所在的节点
	 */
	int ws = node.waitStatus;
	if (ws < 0)
	// 置零当前线程所在节点状态
		compareAndSetWaitStatus(node, ws, 0);

	/*
	 * Thread to unpark is held in successor, which is normally
	 * just the next node.  But if cancelled or apparently null,
	 * traverse backwards from tail to find the actual
	 * non-cancelled successor.
	 */
	Node s = node.next;
	// 找到下一个需要唤醒的节点s
	if (s == null || s.waitStatus > 0) {
	// 如果为空或者已取消
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)// <=0 的节点,都还是有效的节点
				s = t;
	}
	if (s != null)
		LockSupport.unpark(s.thread);// 唤醒线程
}

简单地说,就是用unpark()唤醒等待队列中最前边的那个未放弃的线程。 这里用s来表示这个线程。再和acquireQueued联系起来,s被唤醒后,进入if(p == head && tryAcquire(arg))的判断(即使p!= head 也没有关系,它会再次进入 shouldParkFailedAcquire() 寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire()的调整,s也必然会跑到headnext节点,下一次自旋的时候,(p == head 就成立了)

1
2
3
4
5
6
7
8
9
10
11
12
for (;;) {
	final Node p = node.predecessor();
	if (p == head && tryAcquire(arg)) {
		setHead(node);
		p.next = null; // help GC
		failed = false;
		return interrupted;
	}
	if (shouldParkAfterFailedAcquire(p, node) &&
		parkAndCheckInterrupt())
		interrupted = true;
}

acquireShared(int):在共享模式下线程获取共享资源的顶层入口,会获取指定数量的资源,获取成功则直接返回。获取失败则进入等待队列。知道获取资源为止。整个过程忽略中断

1
2
3
4
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared需要自定义同步器去实现,AQS定义好了返回值的语义:

负数代表获取失败,将当前节点加入到队列;

0代表获取成功但没有剩余资源;

正数表示获取成功,还有剩余资源,其他线程还可以去获取。

  1. tryAcquireShared 尝试获取资源,成功则直接返回
  2. 失败通过doAcquireShared进入等待队列,直到获取资源为止才返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireShared(int arg) {
    // 未当前节点创建一个共享类型的节点,并加入到队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
            // 如果是第二个Node,也就是前驱是head则尝试获取资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // head指向自己,r为当前线程获取资源后的剩余值,也会尝试唤醒其他的后继节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 判断状态,寻找安全点 堵塞当前的线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued很类似。这里只有当线程是 head.next,也就是第二个Node的时候,才会尝试获取资源,如果有剩余的话还会唤醒后续的节点。假设存在这样的场景

  1. head释放了5个资源
  2. head.next需要6个资源,老三需要1个,老四需要2个。老大唤醒了老二,但是老二资源不够,但是老二并不会把资源让给老三,而是继续park等待其他线程释放资源。AQS保证严格的按照FIFO的顺序唤醒,保证了公平但是降低了并发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
        * Try to signal next queued node if:
        *   Propagation was indicated by caller,
        *     or was recorded (as h.waitStatus either before
        *     or after setHead) by a previous operation
        *     (note: this uses sign-check of waitStatus because
        *      PROPAGATE status may transition to SIGNAL.)
        * and
        *   The next node is waiting in shared mode,
        *     or we don't know, because it appears null
        *
        * The conservatism in both of these checks may cause
        * unnecessary wake-ups, but only when there are multiple
        * racing acquires/releases, so most need signals now or soon
        * anyway.
        */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

如果还有资源,尝试唤醒下一个节点。

acquireSharedacquire的区别在于,多了自己拿到资源后,还会唤醒后继节点的操作。这是共享的内涵。允许多个线程并发和同时执行。

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
    // 尝试释放资源
    if (tryReleaseShared(arg)) {
        // 唤醒后继节点
        doReleaseShared();
        return true;
    }
    return false;
}

releaseShared是共享模式下,线程释放共享资源的顶层入口。它会释放指定数量的资源,如果成功且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。

这里的逻辑和独占模式也很接近,不同的是,独占模式tryRelease在完全释放掉资源state=0后,才会返回true唤醒其他线程。这主要是基于独立可充入的考虑。而共享模式本质上是控制一定数量的线程并发执行。用于资源的线程在释放掉部分资源的时候,就可以唤醒后继等待节点。线程也不一定要释放自己有用的所有资源,而是可以释放部分资源。

ReentrantReadWriteLock读锁,的tryReleaseShared只有在完全释放资源才返回ture,这个自定义同步器可以根据自己的需要定义对应的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
    /*
        * Ensure that a release propagates, even if there are other
        * in-progress acquires/releases.  This proceeds in the usual
        * way of trying to unparkSuccessor of head if it needs
        * signal. But if it does not, status is set to PROPAGATE to
        * ensure that upon release, propagation continues.
        * Additionally, we must loop in case a new node is added
        * while we are doing this. Also, unlike other uses of
        * unparkSuccessor, we need to know if CAS to reset status
        * fails, if so rechecking.
        */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h); // 唤醒后继节点
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

同步类在使用AQS的时候,一般自定义Sync为内部类,供自己使用。对内的接口依赖于SyncSync只用实现资源state的获取-释放,线程的排队,等待,唤醒,都在上层的AQS中实现好了。

tryAcquiretryRelease就是使用和掌握AQS的核心。

This post is licensed under CC BY 4.0 by the author.