定义与性质

  • LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。
  • 基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。
  • 头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。

LinkedBlockingQueue继承关系图.png

节点与属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

/**
* Linked list node class
* 链表节点内部类
*/
static class Node<E> {
//节点元素
E item;

/**
* One of:
* 之一:
* - the real successor Node
* 真正的继承节点
* - this Node, meaning the successor is head.next
* 这个节点表示继承节点是head.next
* - null, meaning there is no successor (this is the last node)
* null,表示没有继承节点,它是尾节点
*/
Node<E> next;

Node(E x) {
item = x;
}
}

/**
* The capacity bound, or Integer.MAX_VALUE if none
* 容量界限,如果未设定,则为Integer最大值
*/
private final int capacity;

/**
* Current number of elements
* 当前元素个数
*/
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* 链表的头
* Invariant: head.item == null
* 不变量:head.item == null
*/
transient Node<E> head;

/**
* Tail of linked list.
* 链表的尾
* Invariant: last.next == null
* 不变量:last.next == null
*/
private transient Node<E> last;

/**
* Lock held by take, poll, etc
* take,poll等获取锁
*/
private final ReentrantLock takeLock = new ReentrantLock();

/**
* Wait queue for waiting takes
* 等待任务的等待队列
*/
private final Condition notEmpty = takeLock.newCondition();

/**
* Lock held by put, offer, etc
* put,offer等插入锁
*/
private final ReentrantLock putLock = new ReentrantLock();

/**
* Wait queue for waiting puts
* 等待插入的等待队列
*/
private final Condition notFull = putLock.newCondition();

插入线程与获取线程的相互通知

其中signalNotEmpty()方法,在插入线程发现队列为空时调用,告知获取线程需要等待。
signalNotFull()方法在获取线程发现队列已满时调用,告知插入线程需要等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
* 表示等待take。put/offer调用,否则通常不会锁定takeLock。
*/
private void signalNotEmpty() {
//获取takeLock
final ReentrantLock takeLock = this.takeLock;
//锁定takeLock
takeLock.lock();
try {
//唤醒take线程等待队列
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
}

/**
* Signals a waiting put. Called only from take/poll.
* 表示等待put,take/poll 调用
*/
private void signalNotFull() {
//获取putLock
final ReentrantLock putLock = this.putLock;
//锁定putLock
putLock.lock();
try {
//唤醒插入线程等待队列
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
}

入队与出队操作

enqueue()方法只能在持有 putLock 锁下执行,dequeue()在持有 takeLock 锁下执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Links node at end of queue.
* 在队列尾部插入
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
//last.next指向当前node
//尾指针后移
last = last.next = node;
}

/**
* Removes a node from head of queue.
* 移除队列头
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
//保存头指针
Node<E> h = head;
//获取当前链表第一个元素
Node<E> first = h.next;
//头指针的next指向自己
h.next = h; // help GC
//头指针指向第一个元素
head = first;
//获取第一个元素的值
E x = first.item;
//将第一个元素的值置空
first.item = null;
//返回第一个元素的值
return x;
}

对两把锁的加锁与释放

在需要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保所有地方都是一致的。
而且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败导致锁不释放的风险。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Locks to prevent both puts and takes.
* 锁定putLock和takeLock
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}

/**
* Unlocks to allow both puts and takes.
* 与fullyLock的加锁顺序相反,先解锁takeLock,再解锁putLock
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}