引言

Semaphore内部主要通过AQS(AbstractQueuedSynchronizer)实现线程的管理。
信号量通过使用计数器来控制对共享资源的访问。如果计数器大于零,则允许访问。如果为零,则拒绝访问。计数器的计数是允许访问共享资源的许可。
常用于限制可以访问某些资源的线程数目。

本例子通过增加与减少许可证数来动态控制线程数目,即控制线程并发。

核心源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import java.util.concurrent.Semaphore;

/**
* 可变许可证<br>
*/
public class AdjustableSemaphore {
/**
* The Semaphore.
*/
private final ResizeableSemaphore semaphore = new ResizeableSemaphore();

/**
* 最大许可证数
*/
private int maxPermits = 0;

/**
* Instantiates a new Adjustable semaphore.
*/
public AdjustableSemaphore() {
}

/**
* 设置最大许可证数
*
* @param newMax 最大许可证数
*/
public synchronized void setMaxPermits(int newMax) {
if (newMax < 1) {
throw new IllegalArgumentException("Semaphore size must be at least 1, was:" + newMax);
}

int delta = newMax - this.maxPermits;

if (delta == 0) {
return;
} else if (delta > 0) {
//多释放几次,就可以达到信号量动态增加的效果了
this.semaphore.release(delta);
} else {
delta *= -1;
//减少信号量
this.semaphore.reducePermits(delta);
}

this.maxPermits = newMax;

}

/**
* 获取当前可用的许可证数量
*
* @return 当前可用的许可证数量
*/
public int availablePermits() {
return this.semaphore.availablePermits();
}

/**
* 释放1个许可证
*/
public void release() {
this.semaphore.release();
}

/**
* 当前线程尝试去获取1个许可证。
* <p>
* 此过程是非阻塞的,它只是在方法调用时进行一次尝试。
* <p>
* 如果当前线程获取了1个可用的许可证,则会停止等待,继续执行,并返回true。
* <p>
* 如果当前线程没有获得这个许可证,也会停止等待,继续执行,并返回false。
*
* @return the boolean
*/
public boolean tryAcquire() {
return this.semaphore.tryAcquire();
}


/**
* 可调节信号量 <br>
*/
private static final class ResizeableSemaphore extends Semaphore {

/**
* Instantiates a new Resizeable semaphore.
*/
ResizeableSemaphore() {
super(0);
}

/**
* 减少许可证
*
* @param reduction 减少许可证的数量
*/
@Override
protected void reducePermits(int reduction) {
super.reducePermits(reduction);
}

}

}