前言

先读一遍JCache的README!!!

它主要要解决的问题是:

  1. 使用内存缓存时,一旦应用重启后,由于缓存数据丢失,缓存雪崩,给数据库造成巨大压力,导致应用堵塞。
  2. 使用内存缓存时,多个应用节点无法共享缓存数据。
  3. 使用集中式缓存,由于大量的数据通过缓存获取,导致缓存服务的数据吞吐量太大,带宽跑满。现象就是 Redis 服务负载不高,但是由于机器网卡带宽跑满,导致数据读取非常慢。

J2Cache启动的逻辑

从spring-boot2-starter的net.oschina.j2cache.autoconfigure.J2CacheAutoConfiguration为入口,开始看j2cache的启动逻辑。主要目的是弄明白net.oschina.j2cache.CacheChannel抽象类的对象组装。

1
2
3
4
5
6
@Bean
@DependsOn({"springUtil","j2CacheConfig"})
public CacheChannel cacheChannel(net.oschina.j2cache.J2CacheConfig j2CacheConfig) throws IOException {
J2CacheBuilder builder = J2CacheBuilder.init(j2CacheConfig);
return builder.getChannel();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private CacheChannel channel;
private AtomicBoolean opened = new AtomicBoolean(false);
/**
* 返回缓存操作接口
*
* @return CacheChannel
*/
public CacheChannel getChannel() {
if (this.channel == null || !this.opened.get()) {
synchronized (J2CacheBuilder.class) {
if (this.channel == null || !this.opened.get()) {
//加载配置
this.initFromConfig(config);
/* 初始化缓存接口 */
this.channel = new CacheChannel(config, holder) {
@Override
public void sendClearCmd(String region) {
policy.sendClearCmd(region);
}

@Override
public void sendEvictCmd(String region, String... keys) {
policy.sendEvictCmd(region, keys);
}

@Override
public void close() {
super.close();
policy.disconnect();
holder.shutdown();
opened.set(false);
}
};
this.opened.set(true);
}
}
}
return this.channel;
}
/**
* 加载配置
*
* @return
* @throws IOException
*/
private void initFromConfig(J2CacheConfig config) {
//初始化序列化器
SerializationUtils.init(config.getSerialization(), config.getSubProperties(config.getSerialization()));
//初始化两级的缓存管理
this.holder = CacheProviderHolder.init(config, (region, key) -> {
//当一级缓存中的对象失效时,自动清除二级缓存中的数据
Level2Cache level2 = this.holder.getLevel2Cache(region);
level2.evict(key);
if (!level2.supportTTL()) {
//再一次清除一级缓存是为了避免缓存失效时再次从 L2 获取到值
this.holder.getLevel1Cache(region).evict(key);
}
log.debug("Level 1 cache object expired, evict level 2 cache object [{},{}]", region, key);
if (policy != null)
policy.sendEvictCmd(region, key);
});
//初始化广播策略
policy = ClusterPolicyFactory.init(holder, config.getBroadcast(), config.getBroadcastProperties());
log.info("Using cluster policy : {}", policy.getClass().getName());
}

从上述代码中可以看到:

  • 使用了单例模式,且使用volatile和synchronized关键字满足双重检查锁机制。
  • 加载配置代码中,SerializationUtils#init、ClusterPolicyFactory#init底层使用工厂模式来生成Serializer和ClusterPolicy接口的对象。
  • 初始化两级的缓存管理CacheProviderHolder#init(J2CacheConfig config, CacheExpiredListener listener) ,第二个入参使用了lambda表达式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Initialize Cache Provider
*
* @param config j2cache config instance
* @param listener cache listener
* @return holder : return CacheProviderHolder instance
*/
public static CacheProviderHolder init(J2CacheConfig config, CacheExpiredListener listener) {

CacheProviderHolder holder = new CacheProviderHolder();

holder.listener = listener;
holder.l1_provider = loadProviderInstance(config.getL1CacheName());
if (!holder.l1_provider.isLevel(CacheObject.LEVEL_1))
throw new CacheException(holder.l1_provider.getClass().getName() + " is not level_1 cache provider");
holder.l1_provider.start(config.getL1CacheProperties());
log.info("Using L1 CacheProvider : {}", holder.l1_provider.getClass().getName());

holder.l2_provider = loadProviderInstance(config.getL2CacheName());
if (!holder.l2_provider.isLevel(CacheObject.LEVEL_2))
throw new CacheException(holder.l2_provider.getClass().getName() + " is not level_2 cache provider");
holder.l2_provider.start(config.getL2CacheProperties());
log.info("Using L2 CacheProvider : {}", holder.l2_provider.getClass().getName());

return holder;
}

代码中根据配置获取一级、二级缓存的提供者接口net.oschina.j2cache.CacheProvider的实现对象,我自己使用的是EhCacheProvier和SpringRedisProvider(可以参考下图配置)。并调用相应的net.oschina.j2cache.CacheProvider#start方法。

img

经过上面分析,基本上了解了整个J2Cache启动的逻辑。下面主要看一下EhCacheProvier和SpringRedisProvider的start方法。

EhCacheProvier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private CacheManager manager;
private ConcurrentHashMap<String, EhCache> caches;
/**
* init ehcache config
*
* @param props current configuration settings.
*/
public void start(Properties props) {
if (manager != null) {
log.warn("Attempt to restart an already started EhCacheProvider.");
return;
}

// 如果指定了名称,那么尝试获取已有实例
String ehcacheName = (String)props.get(KEY_EHCACHE_NAME);
if (ehcacheName != null && ehcacheName.trim().length() > 0)
manager = CacheManager.getCacheManager(ehcacheName);
if (manager == null) {
// 指定了配置文件路径? 加载之
if (props.containsKey(KEY_EHCACHE_CONFIG_XML)) {
String propertiesFile = props.getProperty(KEY_EHCACHE_CONFIG_XML);
URL url = getClass().getResource(propertiesFile);
url = (url == null) ? getClass().getClassLoader().getResource(propertiesFile) : url;
manager = CacheManager.newInstance(url);
} else {
// 加载默认实例
manager = CacheManager.getInstance();
}
}
caches = new ConcurrentHashMap<>();
}
  • 属性manager为:net.sf.ehcache.CacheManager
  • 属性caches通过ConcurrentHashMap维护一组EhCache(主要维护了net.sf.ehcache.Cache,用于操作缓存数据,以及一个对缓存数据失效的侦听)。
  • start方法中初始化了manager及caches的实现。
  • CacheManager.getInstance()底层也是volatile和synchronized实现的双重检查锁单例的实现。

SpringRedisProvider

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void start(Properties props) {
//用户拼接redis key
this.namespace = props.getProperty("namespace");
//执行存储方式,key-val 或者 hash存储
this.storage = props.getProperty("storage");
this.config = SpringUtil.getBean(net.oschina.j2cache.autoconfigure.J2CacheConfig.class);
if(config.getL2CacheOpen() == false) {
return;
}
//J2CacheSpringRedisAutoConfiguration#j2CacheRedisTemplate
this.redisTemplate = SpringUtil.getBean("j2CacheRedisTemplate", RedisTemplate.class);
}

这里看起来比较简单,底层直接操作的org.springframework.data.redis.core.RedisTemplate,这个redis客户端bean是在spring-boot2-starter的J2CacheSpringRedisAutoConfiguration类中配置的。

从J2Cache获取缓存数据源码分析

  • 一级缓存使用EnCache,二级缓存使用SpringRedis。
  • net.oschina.j2cache.CacheChannel#get方法为入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private static final Map<String, Object> _g_keyLocks = new ConcurrentHashMap<>();
/**
* 读取缓存(用户无需判断返回的对象是否为空)
* @param region Cache region name
* @param key Cache data key
* @param cacheNullObject 是否缓存空对象
* @return cache object
*/
public CacheObject get(String region, String key, boolean...cacheNullObject) {

if(closed)
throw new IllegalStateException("CacheChannel closed");

CacheObject obj = new CacheObject(region, key, CacheObject.LEVEL_1);
//从一级缓存获取值
obj.setValue(holder.getLevel1Cache(region).get(key));
if(obj.rawValue() != null)
return obj;

String lock_key = key + '%' + region;
//借助synchronized和ConcurrentHashMap实现更细粒度的对象锁
synchronized (_g_keyLocks.computeIfAbsent(lock_key, v -> new Object())) {
//再次从一级缓存获取值
obj.setValue(holder.getLevel1Cache(region).get(key));
if(obj.rawValue() != null)
return obj;

try {
obj.setLevel(CacheObject.LEVEL_2);
//从二级缓存获取值
obj.setValue(holder.getLevel2Cache(region).get(key));
if (obj.rawValue() != null) {
//设置一级缓存的值
holder.getLevel1Cache(region).put(key, obj.rawValue());
}else {
//支持缓存null值
boolean cacheNull = (cacheNullObject.length > 0) ? cacheNullObject[0] : defaultCacheNullObject;
if (cacheNull)
set(region, key, newNullObject(), true);
}
} finally {
//删除无用锁的对象
_g_keyLocks.remove(lock_key);
}
}

return obj;
}

流程:

  • 先从一级缓存获取值。
  • 如果不存在,借助synchronized和ConcurrentHashMap实现更细粒度的对象锁,加锁。
  • 加锁成功后,再次从一级缓存获取值。
  • 如果依旧不存在,从二级缓存获取值,获取到值后,设置一级缓存的值。

一级缓存获取值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
holder.getLevel1Cache(region).get(key)
/**
* Builds a Cache.
*
* @param regionName the regionName of the cache. Must match a cache configured in ehcache.xml
* @param listener cache listener
* @return a newly built cache will be built and initialised
*/
@Override
public EhCache buildCache(String regionName, CacheExpiredListener listener) {
return caches.computeIfAbsent(regionName, v -> {
net.sf.ehcache.Cache cache = manager.getCache(regionName);
if (cache == null) {
manager.addCache(regionName);
cache = manager.getCache(regionName);
log.warn("Could not find configuration [{}]; using defaults (TTL:{} seconds).", regionName, cache.getCacheConfiguration().getTimeToLiveSeconds());
}
return new EhCache(cache, listener);
});
}
public Serializable get(String key) {
if ( key == null )
return null;
Element elem = cache.get( key );
Object obj = (elem != null)?elem.getObjectValue() : null;
return (obj == null || obj.getClass().equals(Object.class))?null:(Serializable)elem.getObjectValue();

主要作用就是通过net.sf.ehcache.CacheManager获取net.sf.ehcache.Cache,组装好EhCache后,通过

EhCache#get()方法,获取net.sf.ehcache.Cache中的值。

二级缓存获取值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
holder.getLevel2Cache(region).get(key)
@Override
public Cache buildCache(String region, CacheExpiredListener listener) {
if(config.getL2CacheOpen() == false) {
return new NullCache();
}
Cache cache = caches.get(region);
if (cache == null) {
synchronized (SpringRedisProvider.class) {
cache = caches.get(region);
if (cache == null) {
if("hash".equalsIgnoreCase(this.storage))
cache = new SpringRedisCache(this.namespace, region, redisTemplate);
else {
cache = new SpringRedisGenericCache(this.namespace, region, redisTemplate);
}
caches.put(region, cache);
}
}
}
return cache;
}

我是用的是key-val存储,对比hash存储的优势是可以配置key失效时间,更加灵活,也就是说代码中的cache为SpringRedisGenericCache对象。在SpringRedisGenericCache中,获取数据直接操作的redis客户端,因为涉及到网络IO,所以有反序列化操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
default Object get(String key) {
byte[] bytes = getBytes(key);
try {
return SerializationUtils.deserialize(bytes);
log.warn("Failed to deserialize object with key:" + key + ",message: " + e.getMessage());
evict(key);
return null;
} catch (IOException e) {
throw new CacheException(e);
}
}
@Override
public byte[] getBytes(String key) {
return redisTemplate.execute((RedisCallback<byte[]>) redis -> {
return redis.get(_key(key));
});
}
private byte[] _key(String key) {
byte[] k;
try {
k = (this.region + ":" + key).getBytes("utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
k = (this.region + ":" + key).getBytes();
}
return k;
}

J2Cache插入数据,缓存一致性是怎么解决的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Write data to J2Cache
*
* @param region: Cache Region name
* @param key: Cache key
* @param value: Cache value
* @param cacheNullObject if allow cache null object
*/
public void set(String region, String key, Object value, boolean cacheNullObject) {

if (!cacheNullObject && value == null)
return ;

if(closed)
throw new IllegalStateException("CacheChannel closed");

try {
Level1Cache level1 = holder.getLevel1Cache(region);
level1.put(key, (value==null && cacheNullObject)?newNullObject():value);
Level2Cache level2 = holder.getLevel2Cache(region);
if(config.isSyncTtlToRedis())
level2.put(key, (value==null && cacheNullObject)?newNullObject():value, level1.ttl());
else
level2.put(key, (value==null && cacheNullObject)?newNullObject():value);
} finally {
this.sendEvictCmd(region, key);//清除原有的一级缓存的内容
}
}

假设有两个应用:A和B,其中应用A收到set请求后:插入一级缓存,再插入二级缓存,最后发送清除命令。

在发送清除命令之前,应用B中的一级缓存还是旧的值,此时存在数据不一致问题。

以SpringRedisPubSubPolicy的pubsub为例,看一下sendEvictCmd方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 发送清除缓存的命令
* @param region 区域名称
* @param keys 缓存键值
*/
default void sendEvictCmd(String region, String...keys) {
publish(new Command(Command.OPT_EVICT_KEY, region, keys));
}
@Override
public void publish(Command cmd) {
if(!isActive && config.getL2CacheOpen()) {
cmd.setSrc(LOCAL_COMMAND_ID);
redisTemplate.convertAndSend(this.channel, cmd.json());
}
}

消息发到Redis后,应用B怎么监听处理的呢?

原来在J2Cache启动,通过ClusterPolicyFactory初始化ClusterPolicy接口的对象SpringRedisPubSubPolicy时,调用其connect方法,其中通过配置,确定缓存清除模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 启用缓存变更广播redis实现 - SpringBoot Redis自定义实现
j2cache.broadcast = net.oschina.j2cache.cache.support.redis.SpringRedisPubSubPolicy

# 缓存清除模式
# active:主动清除,二级缓存过期主动通知各节点清除,优点在于所有节点可以同时收到缓存清除
# passive:被动清除(默认),一级缓存过期进行通知各节点清除一二级缓存
# blend:两种模式一起运作,对于各个节点缓存准确性以及及时性要求高的可以使用(推荐使用前面两种模式中一种)
j2cache.cache-clean-mode=active
@Override
public void connect(Properties props, CacheProviderHolder holder) {
this.holder = holder;
this.config = SpringUtil.getBean(net.oschina.j2cache.autoconfigure.J2CacheConfig.class);
if(config.getL2CacheOpen() == false) {
return;
}
J2CacheConfig j2config = SpringUtil.getBean(J2CacheConfig.class);
this.redisTemplate = SpringUtil.getBean("j2CacheRedisTemplate", RedisTemplate.class);
String channel_name = j2config.getL2CacheProperties().getProperty("channel");
if(channel_name != null && !channel_name.isEmpty()) {
this.channel = channel_name;
}
RedisMessageListenerContainer listenerContainer = SpringUtil.getBean("j2CacheRedisMessageListenerContainer", RedisMessageListenerContainer.class);
String namespace = j2config.getL2CacheProperties().getProperty("namespace");
String database = j2config.getL2CacheProperties().getProperty("database");
String expired = "__keyevent@" + (database == null || "".equals(database) ? "0" : database) + "__:expired";
String del = "__keyevent@" + (database == null || "".equals(database) ? "0" : database) + "__:del";
List<PatternTopic> topics = new ArrayList<>();
topics.add(new PatternTopic(expired));
topics.add(new PatternTopic(del));

if("active".equals(config.getCacheCleanMode())) {
isActive = true;
//设置键值回调 需要redis支持键值回调
ConfigureNotifyKeyspaceEventsAction action = new ConfigureNotifyKeyspaceEventsAction();
action.config(listenerContainer.getConnectionFactory().getConnection());
listenerContainer.addMessageListener(new SpringRedisActiveMessageListener(this, namespace), topics);
}else if("blend".equals(config.getCacheCleanMode())) {
//设置键值回调 需要redis支持键值回调
ConfigureNotifyKeyspaceEventsAction action = new ConfigureNotifyKeyspaceEventsAction();
action.config(listenerContainer.getConnectionFactory().getConnection());
listenerContainer.addMessageListener(new SpringRedisActiveMessageListener(this, namespace), topics);
listenerContainer.addMessageListener(new SpringRedisMessageListener(this, this.channel), new PatternTopic(this.channel));
}else {
listenerContainer.addMessageListener(new SpringRedisMessageListener(this, this.channel), new PatternTopic(this.channel));
}

}

代码28行,增加了redis消息监听:SpringRedisActiveMessageListener,只会监听:key超时和key删除事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void onMessage(Message message, byte[] pattern) {
String key = message.toString();
if (key == null) {
return;
}
if (key.startsWith(namespace + ":")) {
String[] k = key.replaceFirst(namespace + ":", "").split(":", 2);
if(k.length != 2) {
return;
}
clusterPolicy.evict(k[0], k[1]);
}
}

在SpringRedisActiveMessageListener中只会处理特定namespace的key,并调用 clusterPolicy.evict(k[0], k[1])清空缓存方法,清空一级缓存。

1
2
3
4
5
6
7
8
/**
* 删除本地某个缓存条目
* @param region 区域名称
* @param keys 缓存键值
*/
public void evict(String region, String... keys) {
holder.getLevel1Cache(region).evict(keys);
}

参考

并发情况下,单例模式之双重检验锁陷阱