/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. * 创建一个容量为Integer最大值的LinkedBlockingQueue */ publicLinkedBlockingQueue() { this(Integer.MAX_VALUE); }
/** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * 创建一个指定容量的LinkedBlockingQueue * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ publicLinkedBlockingQueue(int capacity) { //参数校验 if (capacity <= 0) thrownewIllegalArgumentException(); //设置容量 this.capacity = capacity; //首尾节点指向一个空节点 last = head = newNode<E>(null); }
/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * given collection, * added in traversal order of the collection's iterator. * 创建一个{@code LinkedBlockingQueue},其容量为{@link Integer#MAX_VALUE}, * 最初包含给定集合的元素,按集合迭代器的遍历顺序添加。 * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ publicLinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); //获取putLock finalReentrantLockputLock=this.putLock; //锁定 putLock.lock(); // Never contended, but necessary for visibility try { intn=0; for (E e : c) { if (e == null) thrownewNullPointerException(); if (n == capacity) thrownewIllegalStateException("Queue full"); enqueue(newNode<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. * <p> * 如果可以在不超过队列容量的情况下立即插入指定的元素到队列的尾部, * 成功后返回{@code true},如果队列已满,返回{@code false}。 * <p> * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * <p> * 当使用容量受限的队列时,此方法通常比方法{@link BlockingQueue#add add}更可取,后者只能通过抛出异常才能插入元素。 * 非阻塞 * * @throws NullPointerException if the specified element is null */ publicbooleanoffer(E e) { //非空判断 if (e == null) thrownewNullPointerException(); //计数器 finalAtomicIntegercount=this.count; //如果队列已满,直接返回插入失败 if (count.get() == capacity) returnfalse; intc= -1; //新建节点 Node<E> node = newNode<E>(e); //获取插入锁 finalReentrantLockputLock=this.putLock; //锁定 putLock.lock(); try { //如果队列未满 if (count.get() < capacity) { //插入队列 enqueue(node); ///计数 c = count.getAndIncrement(); //还有空余空间 if (c + 1 < capacity) //唤醒插入线程 notFull.signal(); } } finally { //解锁 putLock.unlock(); } //如果队列为空 if (c == 0) //通知获取线程阻塞 signalNotEmpty(); //返回成功或者插入失败 return c >= 0; }
/** * Inserts the specified element at the tail of this queue, waiting if * necessary up to the specified wait time for space to become available. * <p> * 将指定的元素插入到此队列的末尾,如果需要,将等待到指定的等待时间,直到空间可用为止。 * * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before space is available * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicbooleanoffer(E e, long timeout, TimeUnit unit) throws InterruptedException {
//不允许空元素 if (e == null) thrownewNullPointerException(); //等待时间 longnanos= unit.toNanos(timeout); //本地变量 intc= -1; //获取putLock finalReentrantLockputLock=this.putLock; //获取计数器 finalAtomicIntegercount=this.count; //可中断加锁,即在锁获取过程中不处理中断状态,而是直接抛出中断异常,由上层调用者处理中断。 putLock.lockInterruptibly(); try { //只要队列已满 while (count.get() == capacity) { //如果超时,则返回插入失败 if (nanos <= 0) returnfalse; //导致当前线程等待,直到发出信号或中断,或指定的等待时间过期。 nanos = notFull.awaitNanos(nanos); } //入队列 enqueue(newNode<E>(e)); //计数 c = count.getAndIncrement(); //如果队列增加1个元素还未满 if (c + 1 < capacity) //唤醒插入进程 notFull.signal(); } finally { //解锁 putLock.unlock(); }
//如果队列中没有元素了 if (c == 0) //通知获取线程等待 signalNotEmpty(); //返回插入成功 returntrue; }
/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * <p> * 将指定的元素插入到此队列的末尾,如果需要,则等待空间可用。 * 阻塞 * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicvoidput(E e)throws InterruptedException { //不可以插入空元素 if (e == null) thrownewNullPointerException(); // Note: convention in all put/take/etc is to preset local var //所有put/take/etc中的约定都是预先设置本地var // holding count negative to indicate failure unless set. //除非设置,否则保持计数为负数表示失败。 intc= -1; //新建节点 Node<E> node = newNode<E>(e); //获取putLock finalReentrantLockputLock=this.putLock; //获取计数器 finalAtomicIntegercount=this.count;
//可中断加锁,即在锁获取过程中不处理中断状态,而是直接抛出中断异常,由上层调用者处理中断。 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. * 注意count在wait守卫线程中使用,即使它没有被锁保护。 * 这是因为count只能在此时减少(所有其他put都被锁定关闭), * 如果它从容量更改,我们(或其他一些等待put)将收到信号。 * 类似地,count在其他等待守卫线程中的所有其他用途也是如此。 */ //只要当前队列已满 while (count.get() == capacity) { //通知插入线程等待 notFull.await(); } //插入队列 enqueue(node); //数量加1 c = count.getAndIncrement(); //如果队列增加1个元素还未满 if (c + 1 < capacity) //唤醒插入进程 notFull.signal(); } finally { //解锁 putLock.unlock(); } //如果队列中没有元素了 if (c == 0) //通知获取线程等待 signalNotEmpty(); }
/** * Removes a single instance of the specified element from this queue, * if it is present. More formally, removes an element {@code e} such * that {@code o.equals(e)}, if this queue contains one or more such * elements. * 从此队列中删除指定元素的单个实例(如果存在)。 * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * 如果此队列包含指定的元素,则返回{@code true}(如果此队列由于调用而更改,则返回相同的值)。 * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call */ publicbooleanremove(Object o) { //不支持null if (o == null) returnfalse; //锁定两个锁 fullyLock(); try { //迭代队列 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //通过equals方法匹配待删除元素 if (o.equals(p.item)) { //移除p节点 unlink(p, trail); //成功 returntrue; } } //失败 returnfalse; } finally { //解锁 fullyUnlock(); } } /** * Unlinks interior Node p with predecessor trail. * 将内部节点p与前一个跟踪断开连接。 */ voidunlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. //p节点内容置空 p.item = null; //trail节点的next指向p的next trail.next = p.next; //如果p是队尾 if (last == p) //trail变为队尾 last = trail; //如果队列已满 if (count.getAndDecrement() == capacity) //通知插入线程阻塞 notFull.signal(); }
/** * Atomically removes all of the elements from this queue. * The queue will be empty after this call returns. * 原子性地从队列中删除所有元素。此调用返回后,队列将为空。 */ publicvoidclear() { //锁定 fullyLock(); try { //清空数据,帮助垃圾回收 for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; // assert head.item == null && head.next == null; //如果容量为0 if (count.getAndSet(0) == capacity) //唤醒插入线程 notFull.signal(); } finally { //解锁 fullyUnlock(); } }