引言
Semaphore内部主要通过AQS(AbstractQueuedSynchronizer)实现线程的管理。
信号量通过使用计数器来控制对共享资源的访问。如果计数器大于零,则允许访问。如果为零,则拒绝访问。计数器的计数是允许访问共享资源的许可。
常用于限制可以访问某些资源的线程数目。
本例子通过增加与减少许可证数来动态控制线程数目,即控制线程并发。
核心源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| import java.util.concurrent.Semaphore;
public class AdjustableSemaphore {
private final ResizeableSemaphore semaphore = new ResizeableSemaphore();
private int maxPermits = 0;
public AdjustableSemaphore() { }
public synchronized void setMaxPermits(int newMax) { if (newMax < 1) { throw new IllegalArgumentException("Semaphore size must be at least 1, was:" + newMax); }
int delta = newMax - this.maxPermits;
if (delta == 0) { return; } else if (delta > 0) { this.semaphore.release(delta); } else { delta *= -1; this.semaphore.reducePermits(delta); }
this.maxPermits = newMax;
}
public int availablePermits() { return this.semaphore.availablePermits(); }
public void release() { this.semaphore.release(); }
public boolean tryAcquire() { return this.semaphore.tryAcquire(); }
private static final class ResizeableSemaphore extends Semaphore {
ResizeableSemaphore() { super(0); }
@Override protected void reducePermits(int reduction) { super.reducePermits(reduction); }
}
}
|