同步队列与条件队列

sync queue

同步队列.png

同步队列是双向链表,使用prev和next来连接节点,nextWaiter属性只是一个标志作用,共享锁模式下使用。
入队时没有锁 -> 在队列中争锁 -> 离开队列时获得了锁

condition queue

条件队列.png

条件队列是用nextWaiter连接节点的单链表。其waitStatus属性中只关注CONDITION,表示线程处于正常的等待状态。
入队时持有锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到sync queue

同步队列与条件队列的联系

同步队列与条件队列的联系.png

当调用某个条件队列的signal方法时,会将某个或所有等待在这个条件队列中的线程唤醒,被唤醒的线程和普通线程一样需要去争锁,
如果没有抢到,则要被加到等待锁的sync queue中去,此时节点就从condition queue中被转移到sync queue中。

CondtionObject源码分析

核心属性

1
2
3
4
5
6
7
8
9
10
/**
* First node of condition queue.
* 首哨兵
*/
private transient Node firstWaiter;
/**
* Last node of condition queue.
* 尾哨兵
*/
private transient Node lastWaiter;

条件等待await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Implements interruptible condition wait.
* 实现可中断条件wait。
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* 如果当前线程被中断,抛出InterruptedException。
* <li> Save lock state returned by {@link #getState}.
* 保存{@link #getState}返回的锁状态。
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* 使用保存的状态作为参数调用{@link #release},如果失败则抛出IllegalMonitorStateException。
* <li> Block until signalled or interrupted.
* 阻塞,直到发出信号或中断。
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* 通过调用{@link # acquisition}的专门化版本并将保存的状态作为参数重新获取。
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* 如果在步骤4中阻塞时中断,则抛出InterruptedException。
* </ol>
*/
public final void await() throws InterruptedException {
//线程中断,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//在等待队列中添加一个新的waiter。
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//节点不在等待队列中
while (!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
//检查中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//如果下一个节点不为null
if (node.nextWaiter != null) // clean up if cancelled
//从条件队列中取消已取消的服务员节点的链接
unlinkCancelledWaiters();
//发生中断
if (interruptMode != 0)
//抛出InterruptedException,重新中断当前线程,或者什么都不做,这取决于模式。
reportInterruptAfterWait(interruptMode);
}

await方法中有几个重要的方法:addConditionWaiter(),fullyRelease(node),isOnSyncQueue(node),checkInterruptWhileWaiting(node),acquireQueued(node, savedState),reportInterruptAfterWait(interruptMode)

addConditionWaiter()

主要就是一些链表操作,将取消等待的节点去掉,新增等待节点在队尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
 /**
* Adds a new waiter to wait queue.
* 在等待队列中添加一个新的waiter。
*
* @return its new wait node 它的新等待节点
*/
private Node addConditionWaiter() {
//保存条件队列的最后一个节点。
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//如果lastWaiter被取消,请清理。
if (t != null && t.waitStatus != Node.CONDITION) {
//从条件队列中取消已取消的Waiter节点的链接
unlinkCancelledWaiters();
t = lastWaiter;
}
//新节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果当前等待队列中没有节点,则node为新的首哨兵
if (t == null)
firstWaiter = node;
else
//指针连接
t.nextWaiter = node;
//尾哨兵移动到node
lastWaiter = node;
return node;
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
* 从条件队列中取消已取消的服务员节点的链接。仅在持锁时调用。
* 当在条件等待期间发生取消时调用此函数,当看到lastWaiter已被取消时调用新waiter。
* 在没有信号的情况下,需要使用这种方法来避免垃圾保留。
* 因此,尽管它可能需要一个完整的遍历,但只有在没有信号的情况下超时或取消才会起作用。
* 它遍历所有节点,而不是停在一个特定的目标上,以断开到垃圾节点的所有指针的链接
* ,而不需要在取消风暴期间进行多次重新遍历。
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
//迭代等待队列,只要ws不为CONDITION,就移除节点
while (t != null) {
//保存当前节点的下一个节点
Node next = t.nextWaiter;
//ws不为CONDITION,准备移除此节点
if (t.waitStatus != Node.CONDITION) {
//当前节点的后继置空,GC
t.nextWaiter = null;
//最开始,trail为null,首哨兵指向当前节点的下一个节点
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
//如果已经没有后继节点
if (next == null)
//尾哨兵指向最后一个等待条件节点
lastWaiter = trail;
} else
//trail指向当前等待条件节点
trail = t;
//后移
t = next;
}
}
fullyRelease(node)

释放锁,只有获取了锁的线程,才能释放锁,即await()方法必须在获取锁之后才能使用。
由源码可知,不管重入了多少次锁,一次性全部释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
 /**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* 使用当前状态值调用release;返回保存的状态。取消节点并在失败时抛出异常。
*
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
//返回同步状态的当前值
int savedState = getState();
//释放锁
if (release(savedState)) {
//锁释放成功
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//锁释放失败的时候,将节点的ws设置为取消状态
node.waitStatus = Node.CANCELLED;
}
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
* 以独占模式释放,如果{@link #tryRelease}返回true,则通过解阻塞一个或多个线程来实现。
* 此方法可用于实现方法{@link Lock#unlock}。
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
//由子类实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒node的后继节点(如果存在的话)。
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Wakes up node's successor, if one exists.
* 唤醒node的后继节点(如果存在的话)。
* 释放锁和取消获取锁时被调用
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
* 如果状态为负,清除预期信号,如果此操作失败或状态被等待线程更改,则没有问题。
* CANCELLED = 1
* 0
* SIGNAL = -1
* CONDITION = -2
* PROPAGATE = -3
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* Thread to unpark被保存在后续节点中,它通常只是下一个节点,
* 但是,如果取消或明显为空,则从tail向后遍历以找到实际的非取消后继。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//从后向前遍历节点,最后s为离当前节点最近的需要被唤醒的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒操作
if (s != null)
LockSupport.unpark(s.thread);
}

isOnSyncQueue(node)

判断当前节点是否在同步队列,如果不在同步队列就挂起当前线程。
如果isOnSyncQueue检测到当前节点不在sync queue中,则说明既没有发生中断,也没有发生过signal,
则当前线程是被“假唤醒”的,那么我们将再次进入循环体,将线程挂起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* 如果一个节点(始终是最初放置在条件队列中的节点)现在正等待在同步队列上重新获取,则返回true。
*
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
* 从后向前查找节点是否存在此同步队列
*/
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
*
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
//从后向前遍历
Node t = tail;
for (; ; ) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

acquireQueued(node, savedState)

正常的获取锁,以前分析过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* 获取队列中已存在线程的独占不可中断模式。
* 用于条件等待方法以及获取。
* 能走到这一步,那么这个等待锁的线程所封装的节点一定在等待队列中
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//循环,最终节点会获取到锁
for (; ; ) {
//获取节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点是头节点,那么就尝试一次获取锁
if (p == head && tryAcquire(arg)) {
//获取锁成功,当前节点变成了头节点,节点中的线程属性也清空。
setHead(node);
// help GC
p.next = null;
failed = false;
return interrupted;
}
//走到这,要么节点的前驱不是头节点,要么是获取锁失败了。
//如果前驱节点waitStatus为SIGNAL,挂起当前线程,并且检查中断
//如果前驱节点waitStatus不为SIGNAL,最终将其设置为SIGNAL
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//在这之前,线程已经被挂起了,坐等解阻塞
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

checkInterruptWhileWaiting(node)和reportInterruptAfterWait(interruptMode)

这两个方法就是处理线程中断的逻辑了,interruptMode有三个值:

  • 0
    代表整个过程中一直没有中断发生。
  • THROW_IE
    表示退出await()方法时需要抛出InterruptedException,这种模式对应于中断发生在signal之前
  • REINTERRUPT
    表示退出await()方法时只需要再自我中断以下,这种模式对应于中断发生在signal之后,即中断来的太晚了。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    /**
    * Checks for interrupt, returning THROW_IE if interrupted
    * before signalled, REINTERRUPT if after signalled, or
    * 0 if not interrupted.
    * 检查是否有中断,如果在信号之前中断,返回THROW_IE;如果在信号之后中断,
    * 返回REINTERRUPT;如果没有中断,返回0。
    */
    private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
    }
    /**
    * Transfers node, if necessary, to sync queue after a cancelled wait.
    * Returns true if thread was cancelled before being signalled.
    * 如果需要,在取消等待后传输节点来同步队列。如果线程在发出信号之前被取消,则返回true。
    *
    * @param node the node
    * @return true if cancelled before the node was signalled
    */
    final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    enq(node);
    return true;
    }
    /*
    * If we lost out to a signal(), then we can't proceed
    * until it finishes its enq(). Cancelling during an
    * incomplete transfer is both rare and transient, so just
    * spin.
    * 如果我们被signal()取代,那么我们就不能继续,直到它完成它的enq()。
    * 在一个不完整的转移过程中取消是罕见的,也是短暂的,所以只要旋转。
    */
    while (!isOnSyncQueue(node))
    //释放CUP
    Thread.yield();
    return false;
    }
    /**
    * Throws InterruptedException, reinterrupts current thread, or
    * does nothing, depending on mode.
    * 抛出InterruptedException,重新中断当前线程,或者什么都不做,这取决于模式。
    */
    private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
    throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
    selfInterrupt();
    }

唤醒signal()

只有获取锁的线程,才能执行这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
* 将等待时间最长的线程(如果存在的话)从该条件的等待队列移动到拥有锁的等待队列。
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
//如果仅针对当前(调用)线程保持同步,则返回true。
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//等待时间最长的线程,队首
Node first = firstWaiter;
if (first != null)
// 将节点从条件队列传输到同步队列,唤醒
doSignal(first);
}

最重要的就是doSignal(first)方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* 删除和传输节点,直到到达不可取消的或null。
* 从signal中分离出来,部分原因是为了鼓励编译器在没有等待器的情况下内联。
*
* @param first (non-null) the first node on condition queue 条件队列上的第一个节点
*/
private void doSignal(Node first) {
//循环,直到将节点从条件队列传输到同步队列返回false 和 条件队列上的第一个节点不为空。
do {
//向后移动一次首哨兵,如果当前首哨兵所在节点为空,则将尾哨兵设为空
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//断开第一个节点与后继节点之间的关系
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

ransferForSignal(Node node) 方法,就是将节点从调节队列转移到同步队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* 将节点从条件队列传输到同步队列。如果成功返回true。
*
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
* 如果无法更改等待状态,则节点已被取消。
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
* 将Splice连接到队列,并尝试设置前辈的等待状态,以指示线程(可能)正在等待。、
* 如果取消或尝试设置等待状态失败,则唤醒并重新同步
* (在这种情况下,等待状态可能是暂时错误的,并且不会造成任何危害)。
*/
Node p = enq(node);
int ws = p.waitStatus;
//节点的CAS等待状态字段。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//解阻塞
LockSupport.unpark(node.thread);
return true;
}