1、五种阻塞队列介绍
- ArrayBlockingQueue
有界队列,底层使用数组实现,并发控制使用ReentrantLock控制,不管是插入操作还是读取操作,都需要获取锁之后才能执行。
- LinkedBlockingQueue
底层基于单向链表实现,既可以当做有界队列,也可以当做无界队列使用。使用两个ReentrantLock实现并发控制:takelock和putlock。
- SynchronousQueue
底层使用单向链表实现,只有一个元素,同步的意思是一个写操作必须等到一个读操作之后才返回,指的是读写线程的同步。
- PriorityBlockingQueue
带排序的阻塞队列的实现,使用数组进行实现。并发控制使用ReentrantLock,队列为无界队列。
有初始化参数指定队列大小,但是会自动扩容。使用最小堆来实现排序。
- DelayedQueue
DelayedQueue是使用PriorityBlockingQueue和Delayed实现的,内部定义了一个优先级队列,当调用offer的时候,把Delayed对象加入队列中,使用take先把first对象拿出来(peek),如果没有到达阈值,进行await处理。
2、poll和peek的区别
都用于取队列的头结点,poll会删除头结点,peek不会删除头结点。
3、LinkedBlockingQueue
- 是先进先出队列FIFO。
- 采用ReentrantLock保证线程安全
3.1、功能
3.1.1、增加
增加有三种方式,前提:队列满
方式 |
put |
add |
offer |
特点 |
一直阻塞 |
抛异常 |
返回false |
3.1.2、删除
删除有三种方式,前提:队列为空
方式 |
remove |
poll |
take |
特点 |
NoSuchElementException |
返回false |
阻塞 |
3.2、简单分析
- LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。
- 基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。
- 头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。
3.2.1、节点与属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
|
3.2.2、插入线程与获取线程的相互通知
signalNotEmpty()方法,在插入线程发现队列为空时调用,告知获取线程需要等待。
signalNotFull()方法,在获取线程发现队列已满时调用,告知插入线程需要等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
|
3.2.3、入队与出队操作
enqueue()方法只能在持有 putLock 锁下执行,dequeue()在持有 takeLock 锁下执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| private void enqueue(Node<E> node) { last = last.next = node; }
private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x; }
|
3.2.4、对两把锁的加锁与释放
在需要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保所有地方都是一致的。而且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败导致锁不释放的风险。
1 2 3 4 5 6 7 8 9 10
| void fullyLock() { putLock.lock(); takeLock.lock(); }
void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
|
3.3、源码解读
简单介绍一下LinkedBlockingQueue中API的源码,如构造方法,新增,获取,删除,drainTo。
3.3.1、构造函数
LinkedBlockingQueue有三个构造方法,其中无参构造尽量少用,因为容量为Integer的最大值,操作不当会出现内存溢出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
|
3.3.2、offer(E e)
将给定的元素设置到队列中,如果设置成功返回true, 否则返回false。 e的值不能为空,否则抛出空指针异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
|
3.3.3、put(E e)
将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();
int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try {
while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
|
3.3.4、peek()
非阻塞的获取队列中的第一个元素,不出队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
|
3.3.5、poll()
非阻塞的获取队列中的值,未获取到返回null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
3.3.6、remove(Object o)
从队列中移除指定的值。将两把锁都锁定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signal(); }
|
3.3.7、clear()
清空队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public void clear() { fullyLock(); try { for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; if (count.getAndSet(0) == capacity) notFull.signal(); } finally { fullyUnlock(); } }
|
3.3.8、drainTo(Collection c)
将队列中值,全部移除,并发设置到给定的集合中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { int n = Math.min(maxElements, count.get()); Node<E> h = head; int i = 0; try { while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { if (i > 0) { head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }
|