引言

Condition是用来代替传统Object中的wait()和notify()实现线程间的协作,Condition的await()和signal()用于处理线程间协作更加安全与高效,JAVA中的阻塞队列就是用Condision实现。Condition的使用必须在锁定与解锁直接使用,且只能通过lock.newCondition()获取。

使用例子:

假设有一个有界的缓冲区,它支持put和take方法。如果在空缓冲区上尝试take,那么线程将阻塞,直到缓冲区中有数据可用为止;如果在一个数据已满的缓冲区上尝试put,那么线程将阻塞,直到空间可用为止。我们希望在单独的等待集中继续等待put线程和take线程,这样我们就可以在缓冲区中的数据或空间可用时只通知单个线程。这可以使用两个Condition实例来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//有界缓冲
class BoundedBuffer {
//重入锁
final Lock lock = new ReentrantLock();
//未满
final Condition notFull = lock.newCondition();
//未空
final Condition notEmpty = lock.newCondition();

//缓冲数组
final Object[] items = new Object[100];
int putptr, takeptr, count;

//插入
public void put(Object x) throws InterruptedException {
//锁定
lock.lock();
try {
//当数组已满,其他插入线程等待,阻塞
while (count == items.length)
notFull.await();
//插入
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
唤醒获取线程
notEmpty.signal();
} finally {
//解锁
lock.unlock();
}
}
//获取
public Object take() throws InterruptedException {
//锁定
lock.lock();
try {
//缓冲区没有值,等待
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
//唤醒插入线程
notFull.signal();
return x;
} finally {
//解锁
lock.unlock();
}
}
}

AbstractQueuedSynchronizer中的ConditionObject实现了Condition接口,而AQS是实现Lock的基石,
一般锁的内部都有同步器,即锁的内部都会有Condision的实现,所以才会用lock.newCondition()获取Condision。

await()源码翻译:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* Implements interruptible condition wait.
* 实现可中断条件wait。
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* 如果当前线程被中断,抛出InterruptedException。
* <li> Save lock state returned by {@link #getState}.
* 保存{@link #getState}返回的锁状态。
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* 使用保存的状态作为参数调用{@link #release},如果失败则抛出IllegalMonitorStateException。
* <li> Block until signalled or interrupted.
* 阻塞,直到发出信号或中断。
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* 通过调用{@link # acquisition}的专门化版本并将保存的状态作为参数重新获取。
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* 如果在步骤4中阻塞时中断,则抛出InterruptedException。
* </ol>
*/
public final void await() throws InterruptedException {
//线程中断,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//在等待队列中添加一个新的waiter。
Node node = addConditionWaiter();
//释放节点(释放锁)
int savedState = fullyRelease(node);
int interruptMode = 0;
//节点不在等待队列中
while (!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
//检查中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//如果下一个节点不为null
if (node.nextWaiter != null) // clean up if cancelled
//从条件队列中取消已取消的服务员节点的链接
unlinkCancelledWaiters();
//发生中断
if (interruptMode != 0)
//抛出InterruptedException,重新中断当前线程,或者什么都不做,这取决于模式。
reportInterruptAfterWait(interruptMode);
}

signal()源码翻译:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
      /**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
* 将等待时间最长的线程(如果存在的话)从该条件的等待队列移动到拥有锁的等待队列。
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
//如果仅针对当前(调用)线程保持同步,则返回true。
//没有拿到锁的线程,不能调用此方法
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//等待时间最长的线程,队首
Node first = firstWaiter;
if (first != null)
// 将节点从条件队列传输到同步队列,唤醒
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* 删除和传输节点,直到到达不可取消的或null。
* 从signal中分离出来,部分原因是为了鼓励编译器在没有等待器的情况下内联。
*
* @param first (non-null) the first node on condition queue 条件队列上的第一个节点
*/
private void doSignal(Node first) {
//循环,直到将节点从条件队列传输到同步队列返回false 和 条件队列上的第一个节点不为空。
do {
//向后移动一次首哨兵,如果当前首哨兵所在节点为空,则将尾哨兵设为空
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//断开第一个节点与后继节点之间的关系
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* 将节点从条件队列传输到同步队列。如果成功返回true。
*
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
* 如果无法更改等待状态,则节点已被取消。
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
* 将Splice连接到队列,并尝试设置前辈的等待状态,以指示线程(可能)正在等待。、
* 如果取消或尝试设置等待状态失败,则唤醒并重新同步
* (在这种情况下,等待状态可能是暂时错误的,并且不会造成任何危害)。
*/
Node p = enq(node);
int ws = p.waitStatus;
//节点的CAS等待状态字段。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//解阻塞
LockSupport.unpark(node.thread);
return true;
}

从上面段源码可知,最底层是用LockSupport.park(this)与LockSupport.unpark(node.thread)来阻塞与解阻塞线程。
一个ReenTrantLock对应一个AQS阻塞队列(同步队列),然后对应多个condition。每个condition对应一个条件队列(等待队列)。
这篇就学习到这,后期学习AQS。