引言

年前有个需求,批量请求供应商API,要有限流操作,并支持TPS与并发数可配置,那时候简单的查了查资料,任务结束就过去了,最近又有个限流的小需求,所以又翻出了以前的代码。

本次简单记录一下令牌桶的实现:

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

令牌桶算法.png

实现思路:
用LinkedBlockingQueue作为装令牌的桶,Executors.newSingleThreadScheduledExecutor()作为定时器定时将令牌放入桶中,使用构建者模式的代码风格。忘了以前在哪抄的了,就这样吧。

贴上核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* TokenBucket<br>
*/
public class TokenBucket {

/**
* 每秒最多请求数量
*/
private int maxFlowRate;

/**
* 每秒平均请求数量
*/
private int avgFlowRate;

/**
* 队列来缓存桶数量
*/
private LinkedBlockingQueue<Byte> tokenQueue;

/**
* 由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制。
* 假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。
* 可以做成延迟计算的形式,每次请求令牌的时候,看当前时间是否晚与下一次生成令牌的时间,计算该段时间的令牌数,
* 加入令牌桶,更新数据。
*/
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

/**
* The Mutex do not use directly.
*/
private volatile Object mutexDoNotUseDirectly = new Object();
/**
* The Is start.
*/
private volatile boolean isStart = false;


/**
* The constant A_CHAR.
*/
private static final Byte A_CHAR = 'a';

/**
* Instantiates a new Token bucket.
*/
private TokenBucket() {
}

/**
* New builder token bucket.
*
* @return the token bucket
*/
public static TokenBucket newBuilder() {
return new TokenBucket();
}


/**
* 每秒内最大请求数量设置
*
* @param maxFlowRate 每秒内最大请求数量
* @return 当前令牌同
*/
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}

/**
* 每秒平均请求数量设置
*
* @param avgFlowRate 每秒平均请求数量
* @return 当前令牌同
*/
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}

/**
* 构造者模式
*
* @return the token bucket
*/
public TokenBucket build() {
//初始化
init();
//返回当前对象
return this;
}

/**
* 初始化
*/
private void init() {
//初始化桶队列大小
if (maxFlowRate > 0) {
tokenQueue = new LinkedBlockingQueue<>(maxFlowRate);
}
//初始化令牌生产者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
//每秒执行一次增加令牌操作
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
//系统启动
isStart = true;

}

/**
* 停止任务
*/
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}

/**
* 查看任务是否执行
*
* @return the boolean
*/
public boolean isStarted() {
return isStart;
}

/**
* 增加令牌
*
* @param tokenNum the token num
*/
private void addTokens(Integer tokenNum) {
// 若是桶已经满了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(A_CHAR);
}
}

/**
* 获取令牌
* <p>
* true:获取到1个令牌,非阻塞
* <p>
* false:未获取到令牌,非阻塞
*
* @return boolean
*/
public boolean tryAcquire() {
synchronized (mutexDoNotUseDirectly) {
// 否存在足够的桶数量
if (tokenQueue.size() > 0) {
//队列不为空时返回队首值并移除,队列为空时返回null。非阻塞立即返回。
Byte poll = tokenQueue.poll();
if (poll != null) {
//获取到令牌
return true;
}
}
}
//未获取到令牌
return false;
}


/**
* 令牌生产者 <br>
*/
private class TokenProducer implements Runnable {

/**
* 每次加入令牌的数量
*/
private int tokenNum;
/**
* 当前令牌桶
*/
private TokenBucket tokenBucket;

/**
* 令牌生产者构造方法
*
* @param tokenNum 每次加入令牌的数量
* @param tokenBucket 当前令牌桶
*/
private TokenProducer(int tokenNum, TokenBucket tokenBucket) {
this.tokenNum = tokenNum;
this.tokenBucket = tokenBucket;
}

@Override
public void run() {
//增加令牌
tokenBucket.addTokens(tokenNum);
}
}

}