引言

共享锁的获取与独占锁很像,他们的区别就是当独占锁已经被某个线程持有时,其他线程只能等待它被释放后,才能去争锁,并且同一时刻只有一个线程能争锁成功。
而共享锁再获取锁之后,其他线程就可以去争锁,所以会有一个锁被多个线程持有的状态。

即:独占锁只有释放锁之后唤醒同步队列后面的节点,共享锁在获取到锁和释放锁之后都会唤醒同步队列后面的节点。

AQS中共享锁的获取逻辑代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
* 以共享模式获取,忽略中断。
* 首先调用至少一次{@link # tryAcquireShared},成功后返回。
* 否则,线程将排队,可能会反复阻塞和解除阻塞,调用{@link
* # tryAcquireShared},直到成功。
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
//尝试获取共享锁,由子类实现
if (tryAcquireShared(arg) < 0)
//通过循环去获取共享锁
doAcquireShared(arg);
}

可知只有两个方法:

tryAcquireShared(arg)

这个方法由子类实现,CountDownLatch的子类Sync与Semaphore中的子类Sync均实现了这个方法:
以CountDownLatch中的为例:

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

很简单,就是状态判断。

doAcquireShared(arg)

由AQS实现,Node节点种的nextWaiter属性不为空。这个方法很复杂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
 /**
* Acquires in shared uninterruptible mode.
* 以共享不可中断模式获取。
*
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
//共享模式节点加入到同步队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是头节点
if (p == head) {
//获取锁
int r = tryAcquireShared(arg);
//大于等于0为获取锁成功
if (r >= 0) {
//设置头节点,并释放锁
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
//标志线程中断
selfInterrupt();
failed = false;
return;
}
}
//如果获取锁失败,阻塞当前线程,并检查中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//取消节点,为了响应式中断设计
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
//设置头节点
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
//节点没有取消,也不是初始化状态,同步队列中还有等待的线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果节点是共享模式,执行释放锁
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
* 共享模式释放锁
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (; ; ) {
//获取头节点
Node h = head;
//队列初始化了,且队列中有等待线程节点,即最起码有两个节点
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果头节点的状态时SIGNAL,则需要唤醒其后继节点
if (ws == Node.SIGNAL) {
//设置节点状态为0,失败则从新开始循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//设置成功后,唤醒后继节点
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//将头节点状态设置为PROPAGATE
continue; // loop on failed CAS
}
//如果头节点改变了,继续循环
if (h == head) // loop if head changed
break;
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获得前驱节点的ws
int ws = pred.waitStatus;
//如果前驱节点是SIGNAL,返回true,就会执行挂起当前线程操作
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//CANCELLED=1,所以如果ws>0,表示前驱节点已经释放锁了
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//1,前驱节点指向前驱节点的前驱
//2,当前节点的前驱指向前驱节点
// 即跳过取消了等待锁的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//新前驱节点的后继指向当前节点
pred.next = node;
} else {
//这种情况就直接将前驱节点的ws设置为SIGNAL
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
//线程被挂起了,不会向下执行了,等待被唤醒
LockSupport.park(this);
return Thread.interrupted();
}

从分析源码中可以看出,共享锁的释放依旧是状态的修改+循环CAS+队列。