文章首发于:clawhub.club


有的时候,一些需求过于简单,不必要用很复杂的方式解决,比如不引入中间件。这个ExpiringMap就是其中的一个产物,当然,单机环境下它仍然有自己的用途。
ExpiringMap包含一个工作线程,这个线程定期扫描现有的键值对中,是否有过期的数据,如果有的话就删除,并且支持实现过期时监听事件。
源码相对来说比较简单,是mina中的一个工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* A map with expiration. This class contains a worker thread that will
* periodically check this class in order to determine if any objects
* should be removed based on the provided time-to-live value.
* 带过期时间的Map。这个类包含一个工作线程,它将定期检查这个类,以便根据提供的生存时间值确定是否应该删除任何对象。
*
* @param <K> The key type
* @param <V> The value type
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public class ExpiringMap<K, V> implements Map<K, V> {
/**
* The default value, 60 seconds
*/
public static final int DEFAULT_TIME_TO_LIVE = 60;

/**
* The default value, 1 second
*/
public static final int DEFAULT_EXPIRATION_INTERVAL = 1;

private static volatile int expirerCount = 1;

private final ConcurrentHashMap<K, ExpiringObject> delegate;

private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;

private final Expirer expirer;

/**
* Creates a new instance of ExpiringMap using the default values
* DEFAULT_TIME_TO_LIVE and DEFAULT_EXPIRATION_INTERVAL
*/
public ExpiringMap() {
this(DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL);
}

/**
* Creates a new instance of ExpiringMap using the supplied
* time-to-live value and the default value for DEFAULT_EXPIRATION_INTERVAL
*
* @param timeToLive The time-to-live value (seconds)
*/
public ExpiringMap(int timeToLive) {
this(timeToLive, DEFAULT_EXPIRATION_INTERVAL);
}

/**
* Creates a new instance of ExpiringMap using the supplied values and
* a {@link ConcurrentHashMap} for the internal data structure.
*
* @param timeToLive The time-to-live value (seconds)
* @param expirationInterval The time between checks to see if a value should be removed (seconds)
*/
public ExpiringMap(int timeToLive, int expirationInterval) {
this(new ConcurrentHashMap<>(), new CopyOnWriteArrayList<>(), timeToLive,
expirationInterval);
}

private ExpiringMap(ConcurrentHashMap<K, ExpiringObject> delegate,
CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners, int timeToLive, int expirationInterval) {
this.delegate = delegate;
this.expirationListeners = expirationListeners;

this.expirer = new Expirer();
expirer.setTimeToLive(timeToLive);
expirer.setExpirationInterval(expirationInterval);
//开启监听线程
expirer.startExpiring();
}

/**
* {@inheritDoc}
*/
@Override
public V put(K key, V value) {
ExpiringObject answer = delegate.put(key, new ExpiringObject(key, value, System.currentTimeMillis()));

if (answer == null) {
return null;
}

return answer.getValue();
}

/**
* {@inheritDoc}
*/
@Override
public V get(Object key) {
ExpiringObject object = delegate.get(key);

if (object != null) {
object.setLastAccessTime(System.currentTimeMillis());

return object.getValue();
}

return null;
}

/**
* {@inheritDoc}
*/
@Override
public V remove(Object key) {
ExpiringObject answer = delegate.remove(key);
if (answer == null) {
return null;
}

return answer.getValue();
}

/**
* {@inheritDoc}
*/
@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key);
}

/**
* {@inheritDoc}
*/
@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}

/**
* {@inheritDoc}
*/
@Override
public int size() {
return delegate.size();
}

/**
* {@inheritDoc}
*/
@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

/**
* {@inheritDoc}
*/
@Override
public void clear() {
delegate.clear();
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return delegate.hashCode();
}

/**
* {@inheritDoc}
*/
@Override
public Set<K> keySet() {
return delegate.keySet();
}

/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}

/**
* {@inheritDoc}
*/
@Override
public void putAll(Map<? extends K, ? extends V> inMap) {
for (Entry<? extends K, ? extends V> e : inMap.entrySet()) {
this.put(e.getKey(), e.getValue());
}
}

/**
* {@inheritDoc}
*/
@Override
public Collection<V> values() {
throw new UnsupportedOperationException();
}

/**
* {@inheritDoc}
*/
@Override
public Set<Map.Entry<K, V>> entrySet() {
throw new UnsupportedOperationException();
}

/**
* Adds a listener in the expiration listeners
*
* @param listener The listener to add
*/
public void addExpirationListener(ExpirationListener<V> listener) {
expirationListeners.add(listener);
}

/**
* Removes a listener from the expiration listeners
*
* @param listener The listener to remove
*/
public void removeExpirationListener(ExpirationListener<V> listener) {
expirationListeners.remove(listener);
}

/**
* @return The Expirer instance
*/
public Expirer getExpirer() {
return expirer;
}

/**
* Get the interval in which an object will live in the map before it is removed.
*
* @return The expiration time in second
*/
public int getExpirationInterval() {
return expirer.getExpirationInterval();
}

/**
* @return the Time-to-live value in seconds.
*/
public int getTimeToLive() {
return expirer.getTimeToLive();
}

/**
* Set the interval in which an object will live in the map before it is removed.
*
* @param expirationInterval The expiration time in seconds
*/
public void setExpirationInterval(int expirationInterval) {
expirer.setExpirationInterval(expirationInterval);
}

/**
* Update the value for the time-to-live
*
* @param timeToLive The time-to-live (seconds)
*/
public void setTimeToLive(int timeToLive) {
expirer.setTimeToLive(timeToLive);
}

private class ExpiringObject {
private K key;

private V value;

private long lastAccessTime;

private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();

ExpiringObject(K key, V value, long lastAccessTime) {
if (value == null) {
throw new IllegalArgumentException("An expiring object cannot be null.");
}

this.key = key;
this.value = value;
this.lastAccessTime = lastAccessTime;
}

public long getLastAccessTime() {
lastAccessTimeLock.readLock().lock();

try {
return lastAccessTime;
} finally {
lastAccessTimeLock.readLock().unlock();
}
}

public void setLastAccessTime(long lastAccessTime) {
lastAccessTimeLock.writeLock().lock();

try {
this.lastAccessTime = lastAccessTime;
} finally {
lastAccessTimeLock.writeLock().unlock();
}
}

public K getKey() {
return key;
}

public V getValue() {
return value;
}

@Override
public boolean equals(Object obj) {
return value.equals(obj);
}

@Override
public int hashCode() {
return value.hashCode();
}
}

/**
* A Thread that monitors an {@link ExpiringMap} and will remove
* elements that have passed the threshold.
* 一个线程,它监视{@link ExpiringMap},并将删除超过阈值的元素。
*/
public class Expirer implements Runnable {
private final ReadWriteLock stateLock = new ReentrantReadWriteLock();

private long timeToLiveMillis;

private long expirationIntervalMillis;

private boolean running = false;

private final Thread expirerThread;

/**
* Creates a new instance of Expirer.
*/
public Expirer() {
expirerThread = new Thread(this, "ExpiringMapExpirer-" + expirerCount++);
//守护线程
expirerThread.setDaemon(true);
}

/**
* {@inheritDoc}
*/
@Override
public void run() {
while (running) {
processExpires();

try {
Thread.sleep(expirationIntervalMillis);
} catch (InterruptedException e) {
// Do nothing
}
}
}

private void processExpires() {
long timeNow = System.currentTimeMillis();

for (ExpiringObject o : delegate.values()) {

if (timeToLiveMillis <= 0) {
continue;
}

long timeIdle = timeNow - o.getLastAccessTime();

if (timeIdle >= timeToLiveMillis) {
delegate.remove(o.getKey());

for (ExpirationListener<V> listener : expirationListeners) {
listener.expired(o.getValue());
}
}
}
}

/**
* Kick off this thread which will look for old objects and remove them.
* 启动这个线程,它将查找旧对象并删除它们。
*/
public void startExpiring() {
stateLock.writeLock().lock();

try {
if (!running) {
running = true;
expirerThread.start();
}
} finally {
stateLock.writeLock().unlock();
}
}

/**
* If this thread has not started, then start it.
* Otherwise just return;
* 如果这个线程还没有启动,那么启动它。否则就返回;
*/
public void startExpiringIfNotStarted() {
stateLock.readLock().lock();

try {
if (running) {
return;
}
} finally {
stateLock.readLock().unlock();
}

stateLock.writeLock().lock();

try {
if (!running) {
running = true;
expirerThread.start();
}
} finally {
stateLock.writeLock().unlock();
}
}

/**
* Stop the thread from monitoring the map.
* 停止线程监视Map。
*/
public void stopExpiring() {
stateLock.writeLock().lock();

try {
if (running) {
running = false;
expirerThread.interrupt();
}
} finally {
stateLock.writeLock().unlock();
}
}

/**
* Checks to see if the thread is running
*
* @return If the thread is running, true. Otherwise false.
*/
public boolean isRunning() {
stateLock.readLock().lock();

try {
return running;
} finally {
stateLock.readLock().unlock();
}
}

/**
* @return the Time-to-live value in seconds.
*/
public int getTimeToLive() {
stateLock.readLock().lock();

try {
return (int) timeToLiveMillis / 1000;
} finally {
stateLock.readLock().unlock();
}
}

/**
* Update the value for the time-to-live
*
* @param timeToLive The time-to-live (seconds)
*/
public void setTimeToLive(long timeToLive) {
stateLock.writeLock().lock();

try {
this.timeToLiveMillis = timeToLive * 1000;
} finally {
stateLock.writeLock().unlock();
}
}

/**
* Get the interval in which an object will live in the map before
* it is removed.
*
* @return The time in seconds.
*/
public int getExpirationInterval() {
stateLock.readLock().lock();

try {
return (int) expirationIntervalMillis / 1000;
} finally {
stateLock.readLock().unlock();
}
}

/**
* Set the interval in which an object will live in the map before
* it is removed.
*
* @param expirationInterval The time in seconds
*/
public void setExpirationInterval(long expirationInterval) {
stateLock.writeLock().lock();

try {
this.expirationIntervalMillis = expirationInterval * 1000;
} finally {
stateLock.writeLock().unlock();
}
}
}

/**
* A listener for expired object events.
* 过期对象事件的侦听器。
*
* @param <E> The event type
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public interface ExpirationListener<E> {
/**
* Adds a given event to the listener
* 向侦听器添加给定的事件
*
* @param expiredObject The expired event
*/
void expired(E expiredObject);
}

}


我将ExpirationListener监听接口也一并放到了ExpiringMap中。

  • 存储键值对的是:ConcurrentHashMap<K, ExpiringObject> delegate,其中ExpiringObject对象存放了键值与最后访问时间(新增和查看时会更新这个时间)。
  • 存储配置的监听器的是:CopyOnWriteArrayList<ExpirationListener> expirationListeners,键值对过期是,迭代所有配置的监听器,回调。
  • 监控器由Expirer实现,初始化时,新建一个守护线程,定期扫描所有的键值对,过期即删除,监听回调也是在这里调用。
  • 监控器的数量由 private static volatile int expirerCount;表示。多个Map实例化时,可获取监控器的数量。

这个类中用了ConcurrentHashMap,CopyOnWriteArrayList,ReentrantReadWriteLock等工具类,所以并发访问时线程安全。