前言
先读一遍JCache的README!!!
它主要要解决的问题是:
- 使用内存缓存时,一旦应用重启后,由于缓存数据丢失,缓存雪崩,给数据库造成巨大压力,导致应用堵塞。
- 使用内存缓存时,多个应用节点无法共享缓存数据。
- 使用集中式缓存,由于大量的数据通过缓存获取,导致缓存服务的数据吞吐量太大,带宽跑满。现象就是 Redis 服务负载不高,但是由于机器网卡带宽跑满,导致数据读取非常慢。
J2Cache启动的逻辑
从spring-boot2-starter的net.oschina.j2cache.autoconfigure.J2CacheAutoConfiguration为入口,开始看j2cache的启动逻辑。主要目的是弄明白net.oschina.j2cache.CacheChannel抽象类的对象组装。
1 2 3 4 5 6
| @Bean @DependsOn({"springUtil","j2CacheConfig"}) public CacheChannel cacheChannel(net.oschina.j2cache.J2CacheConfig j2CacheConfig) throws IOException { J2CacheBuilder builder = J2CacheBuilder.init(j2CacheConfig); return builder.getChannel(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| private CacheChannel channel; private AtomicBoolean opened = new AtomicBoolean(false);
public CacheChannel getChannel() { if (this.channel == null || !this.opened.get()) { synchronized (J2CacheBuilder.class) { if (this.channel == null || !this.opened.get()) { this.initFromConfig(config); this.channel = new CacheChannel(config, holder) { @Override public void sendClearCmd(String region) { policy.sendClearCmd(region); } @Override public void sendEvictCmd(String region, String... keys) { policy.sendEvictCmd(region, keys); } @Override public void close() { super.close(); policy.disconnect(); holder.shutdown(); opened.set(false); } }; this.opened.set(true); } } } return this.channel; }
private void initFromConfig(J2CacheConfig config) { SerializationUtils.init(config.getSerialization(), config.getSubProperties(config.getSerialization())); this.holder = CacheProviderHolder.init(config, (region, key) -> { Level2Cache level2 = this.holder.getLevel2Cache(region); level2.evict(key); if (!level2.supportTTL()) { this.holder.getLevel1Cache(region).evict(key); } log.debug("Level 1 cache object expired, evict level 2 cache object [{},{}]", region, key); if (policy != null) policy.sendEvictCmd(region, key); }); policy = ClusterPolicyFactory.init(holder, config.getBroadcast(), config.getBroadcastProperties()); log.info("Using cluster policy : {}", policy.getClass().getName()); }
|
从上述代码中可以看到:
- 使用了单例模式,且使用volatile和synchronized关键字满足双重检查锁机制。
- 加载配置代码中,SerializationUtils#init、ClusterPolicyFactory#init底层使用工厂模式来生成Serializer和ClusterPolicy接口的对象。
- 初始化两级的缓存管理CacheProviderHolder#init(J2CacheConfig config, CacheExpiredListener listener) ,第二个入参使用了lambda表达式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public static CacheProviderHolder init(J2CacheConfig config, CacheExpiredListener listener) { CacheProviderHolder holder = new CacheProviderHolder(); holder.listener = listener; holder.l1_provider = loadProviderInstance(config.getL1CacheName()); if (!holder.l1_provider.isLevel(CacheObject.LEVEL_1)) throw new CacheException(holder.l1_provider.getClass().getName() + " is not level_1 cache provider"); holder.l1_provider.start(config.getL1CacheProperties()); log.info("Using L1 CacheProvider : {}", holder.l1_provider.getClass().getName()); holder.l2_provider = loadProviderInstance(config.getL2CacheName()); if (!holder.l2_provider.isLevel(CacheObject.LEVEL_2)) throw new CacheException(holder.l2_provider.getClass().getName() + " is not level_2 cache provider"); holder.l2_provider.start(config.getL2CacheProperties()); log.info("Using L2 CacheProvider : {}", holder.l2_provider.getClass().getName()); return holder; }
|
代码中根据配置获取一级、二级缓存的提供者接口net.oschina.j2cache.CacheProvider的实现对象,我自己使用的是EhCacheProvier和SpringRedisProvider(可以参考下图配置)。并调用相应的net.oschina.j2cache.CacheProvider#start方法。
经过上面分析,基本上了解了整个J2Cache启动的逻辑。下面主要看一下EhCacheProvier和SpringRedisProvider的start方法。
EhCacheProvier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| private CacheManager manager; private ConcurrentHashMap<String, EhCache> caches;
public void start(Properties props) { if (manager != null) { log.warn("Attempt to restart an already started EhCacheProvider."); return; } String ehcacheName = (String)props.get(KEY_EHCACHE_NAME); if (ehcacheName != null && ehcacheName.trim().length() > 0) manager = CacheManager.getCacheManager(ehcacheName); if (manager == null) { if (props.containsKey(KEY_EHCACHE_CONFIG_XML)) { String propertiesFile = props.getProperty(KEY_EHCACHE_CONFIG_XML); URL url = getClass().getResource(propertiesFile); url = (url == null) ? getClass().getClassLoader().getResource(propertiesFile) : url; manager = CacheManager.newInstance(url); } else { manager = CacheManager.getInstance(); } } caches = new ConcurrentHashMap<>(); }
|
- 属性manager为:net.sf.ehcache.CacheManager
- 属性caches通过ConcurrentHashMap维护一组EhCache(主要维护了net.sf.ehcache.Cache,用于操作缓存数据,以及一个对缓存数据失效的侦听)。
- start方法中初始化了manager及caches的实现。
- CacheManager.getInstance()底层也是volatile和synchronized实现的双重检查锁单例的实现。
SpringRedisProvider
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void start(Properties props) { this.namespace = props.getProperty("namespace"); this.storage = props.getProperty("storage"); this.config = SpringUtil.getBean(net.oschina.j2cache.autoconfigure.J2CacheConfig.class); if(config.getL2CacheOpen() == false) { return; } this.redisTemplate = SpringUtil.getBean("j2CacheRedisTemplate", RedisTemplate.class); }
|
这里看起来比较简单,底层直接操作的org.springframework.data.redis.core.RedisTemplate,这个redis客户端bean是在spring-boot2-starter的J2CacheSpringRedisAutoConfiguration类中配置的。
从J2Cache获取缓存数据源码分析
- 一级缓存使用EnCache,二级缓存使用SpringRedis。
- net.oschina.j2cache.CacheChannel#get方法为入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| private static final Map<String, Object> _g_keyLocks = new ConcurrentHashMap<>();
public CacheObject get(String region, String key, boolean...cacheNullObject) { if(closed) throw new IllegalStateException("CacheChannel closed"); CacheObject obj = new CacheObject(region, key, CacheObject.LEVEL_1); obj.setValue(holder.getLevel1Cache(region).get(key)); if(obj.rawValue() != null) return obj; String lock_key = key + '%' + region; synchronized (_g_keyLocks.computeIfAbsent(lock_key, v -> new Object())) { obj.setValue(holder.getLevel1Cache(region).get(key)); if(obj.rawValue() != null) return obj; try { obj.setLevel(CacheObject.LEVEL_2); obj.setValue(holder.getLevel2Cache(region).get(key)); if (obj.rawValue() != null) { holder.getLevel1Cache(region).put(key, obj.rawValue()); }else { boolean cacheNull = (cacheNullObject.length > 0) ? cacheNullObject[0] : defaultCacheNullObject; if (cacheNull) set(region, key, newNullObject(), true); } } finally { _g_keyLocks.remove(lock_key); } } return obj; }
|
流程:
- 先从一级缓存获取值。
- 如果不存在,借助synchronized和ConcurrentHashMap实现更细粒度的对象锁,加锁。
- 加锁成功后,再次从一级缓存获取值。
- 如果依旧不存在,从二级缓存获取值,获取到值后,设置一级缓存的值。
一级缓存获取值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| holder.getLevel1Cache(region).get(key)
@Override public EhCache buildCache(String regionName, CacheExpiredListener listener) { return caches.computeIfAbsent(regionName, v -> { net.sf.ehcache.Cache cache = manager.getCache(regionName); if (cache == null) { manager.addCache(regionName); cache = manager.getCache(regionName); log.warn("Could not find configuration [{}]; using defaults (TTL:{} seconds).", regionName, cache.getCacheConfiguration().getTimeToLiveSeconds()); } return new EhCache(cache, listener); }); } public Serializable get(String key) { if ( key == null ) return null; Element elem = cache.get( key ); Object obj = (elem != null)?elem.getObjectValue() : null; return (obj == null || obj.getClass().equals(Object.class))?null:(Serializable)elem.getObjectValue();
|
主要作用就是通过net.sf.ehcache.CacheManager获取net.sf.ehcache.Cache,组装好EhCache后,通过
EhCache#get()方法,获取net.sf.ehcache.Cache中的值。
二级缓存获取值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| holder.getLevel2Cache(region).get(key) @Override public Cache buildCache(String region, CacheExpiredListener listener) { if(config.getL2CacheOpen() == false) { return new NullCache(); } Cache cache = caches.get(region); if (cache == null) { synchronized (SpringRedisProvider.class) { cache = caches.get(region); if (cache == null) { if("hash".equalsIgnoreCase(this.storage)) cache = new SpringRedisCache(this.namespace, region, redisTemplate); else { cache = new SpringRedisGenericCache(this.namespace, region, redisTemplate); } caches.put(region, cache); } } } return cache; }
|
我是用的是key-val存储,对比hash存储的优势是可以配置key失效时间,更加灵活,也就是说代码中的cache为SpringRedisGenericCache对象。在SpringRedisGenericCache中,获取数据直接操作的redis客户端,因为涉及到网络IO,所以有反序列化操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Override default Object get(String key) { byte[] bytes = getBytes(key); try { return SerializationUtils.deserialize(bytes); log.warn("Failed to deserialize object with key:" + key + ",message: " + e.getMessage()); evict(key); return null; } catch (IOException e) { throw new CacheException(e); } } @Override public byte[] getBytes(String key) { return redisTemplate.execute((RedisCallback<byte[]>) redis -> { return redis.get(_key(key)); }); } private byte[] _key(String key) { byte[] k; try { k = (this.region + ":" + key).getBytes("utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); k = (this.region + ":" + key).getBytes(); } return k; }
|
J2Cache插入数据,缓存一致性是怎么解决的?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
public void set(String region, String key, Object value, boolean cacheNullObject) {
if (!cacheNullObject && value == null) return ;
if(closed) throw new IllegalStateException("CacheChannel closed");
try { Level1Cache level1 = holder.getLevel1Cache(region); level1.put(key, (value==null && cacheNullObject)?newNullObject():value); Level2Cache level2 = holder.getLevel2Cache(region); if(config.isSyncTtlToRedis()) level2.put(key, (value==null && cacheNullObject)?newNullObject():value, level1.ttl()); else level2.put(key, (value==null && cacheNullObject)?newNullObject():value); } finally { this.sendEvictCmd(region, key); } }
|
假设有两个应用:A和B,其中应用A收到set请求后:插入一级缓存,再插入二级缓存,最后发送清除命令。
在发送清除命令之前,应用B中的一级缓存还是旧的值,此时存在数据不一致问题。
以SpringRedisPubSubPolicy的pubsub为例,看一下sendEvictCmd方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
default void sendEvictCmd(String region, String...keys) { publish(new Command(Command.OPT_EVICT_KEY, region, keys)); } @Override public void publish(Command cmd) { if(!isActive && config.getL2CacheOpen()) { cmd.setSrc(LOCAL_COMMAND_ID); redisTemplate.convertAndSend(this.channel, cmd.json()); } }
|
消息发到Redis后,应用B怎么监听处理的呢?
原来在J2Cache启动,通过ClusterPolicyFactory初始化ClusterPolicy接口的对象SpringRedisPubSubPolicy时,调用其connect方法,其中通过配置,确定缓存清除模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| # 启用缓存变更广播redis实现 - SpringBoot Redis自定义实现 j2cache.broadcast = net.oschina.j2cache.cache.support.redis.SpringRedisPubSubPolicy
# 缓存清除模式 # active:主动清除,二级缓存过期主动通知各节点清除,优点在于所有节点可以同时收到缓存清除 # passive:被动清除(默认),一级缓存过期进行通知各节点清除一二级缓存 # blend:两种模式一起运作,对于各个节点缓存准确性以及及时性要求高的可以使用(推荐使用前面两种模式中一种) j2cache.cache-clean-mode=active @Override public void connect(Properties props, CacheProviderHolder holder) { this.holder = holder; this.config = SpringUtil.getBean(net.oschina.j2cache.autoconfigure.J2CacheConfig.class); if(config.getL2CacheOpen() == false) { return; } J2CacheConfig j2config = SpringUtil.getBean(J2CacheConfig.class); this.redisTemplate = SpringUtil.getBean("j2CacheRedisTemplate", RedisTemplate.class); String channel_name = j2config.getL2CacheProperties().getProperty("channel"); if(channel_name != null && !channel_name.isEmpty()) { this.channel = channel_name; } RedisMessageListenerContainer listenerContainer = SpringUtil.getBean("j2CacheRedisMessageListenerContainer", RedisMessageListenerContainer.class); String namespace = j2config.getL2CacheProperties().getProperty("namespace"); String database = j2config.getL2CacheProperties().getProperty("database"); String expired = "__keyevent@" + (database == null || "".equals(database) ? "0" : database) + "__:expired"; String del = "__keyevent@" + (database == null || "".equals(database) ? "0" : database) + "__:del"; List<PatternTopic> topics = new ArrayList<>(); topics.add(new PatternTopic(expired)); topics.add(new PatternTopic(del)); if("active".equals(config.getCacheCleanMode())) { isActive = true; ConfigureNotifyKeyspaceEventsAction action = new ConfigureNotifyKeyspaceEventsAction(); action.config(listenerContainer.getConnectionFactory().getConnection()); listenerContainer.addMessageListener(new SpringRedisActiveMessageListener(this, namespace), topics); }else if("blend".equals(config.getCacheCleanMode())) { ConfigureNotifyKeyspaceEventsAction action = new ConfigureNotifyKeyspaceEventsAction(); action.config(listenerContainer.getConnectionFactory().getConnection()); listenerContainer.addMessageListener(new SpringRedisActiveMessageListener(this, namespace), topics); listenerContainer.addMessageListener(new SpringRedisMessageListener(this, this.channel), new PatternTopic(this.channel)); }else { listenerContainer.addMessageListener(new SpringRedisMessageListener(this, this.channel), new PatternTopic(this.channel)); } }
|
代码28行,增加了redis消息监听:SpringRedisActiveMessageListener,只会监听:key超时和key删除事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override public void onMessage(Message message, byte[] pattern) { String key = message.toString(); if (key == null) { return; } if (key.startsWith(namespace + ":")) { String[] k = key.replaceFirst(namespace + ":", "").split(":", 2); if(k.length != 2) { return; } clusterPolicy.evict(k[0], k[1]); } }
|
在SpringRedisActiveMessageListener中只会处理特定namespace的key,并调用 clusterPolicy.evict(k[0], k[1])清空缓存方法,清空一级缓存。
1 2 3 4 5 6 7 8
|
public void evict(String region, String... keys) { holder.getLevel1Cache(region).evict(keys); }
|
参考
并发情况下,单例模式之双重检验锁陷阱