从三个方向学习:
系统启动,加载J2CacheSpringRedisAutoConfiguration配置
J2Cache缓存集群策略SpringRedisPubSubPolicy
二级缓存操作:SpringRedisProvider以及SpringRedisGenericCache、SpringRedisCache
1、RedisTemplate对象创建
J2CacheSpringRedisAutoConfiguration配置文件在RedisAutoConfiguration之后,J2CacheAutoConfiguration之前加载。
通过j2cache.redis-client可以配置redis客户端:Jedis或则Lettuce,本次学习使用Lettuce。
使用RedisStandaloneConfiguration构建LettuceConnectionFactory对象。
最后将RedisTemplate的对象交给Spring管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Bean("j2CacheRedisTemplate") public RedisTemplate<String, Serializable> j2CacheRedisTemplate ( @Qualifier("j2CahceRedisConnectionFactory") RedisConnectionFactory j2CahceRedisConnectionFactory, @Qualifier("j2CacheValueSerializer") RedisSerializer<Object> j2CacheSerializer) { RedisTemplate<String, Serializable> template = new RedisTemplate <String, Serializable>(); template.setKeySerializer(new StringRedisSerializer ()); template.setHashKeySerializer(new StringRedisSerializer ()); template.setDefaultSerializer(j2CacheSerializer); template.setConnectionFactory(j2CahceRedisConnectionFactory); return template; } @Primary @Bean("j2CahceRedisConnectionFactory") public LettuceConnectionFactory lettuceConnectionFactory (net.oschina.j2cache.J2CacheConfig j2CacheConfig) { LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder config = LettucePoolingClientConfiguration.builder(); config.commandTimeout(Duration.ofMillis(5000 )); config.poolConfig(getGenericRedisPool()); RedisStandaloneConfiguration single = new RedisStandaloneConfiguration ("xxxx" , 6379 ); single.setDatabase(10 ); single.setPassword("****" ); LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory (single, config.build()); return connectionFactory; }
再看RedisTemplate类:
RedisTemplate继承了RedisAccessor类,实现RedisOperations和BeanClassLoaderAware接口,其中RedisAccessor维护了redis连接工厂,RedisOperations定义redis相关操作。
这里的connectionFactory为LettuceConnectionFactory:操作Redis的准备工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private @Nullable AbstractRedisClient client;private @Nullable LettuceConnectionProvider connectionProvider;private @Nullable LettuceConnectionProvider reactiveConnectionProvider;public void afterPropertiesSet () { this .client = createClient(); this .connectionProvider = new ExceptionTranslatingConnectionProvider (createConnectionProvider(client, CODEC)); this .reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider ( createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC)); } protected AbstractRedisClient createClient () { RedisURI uri = isDomainSocketAware() ? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket()) : createRedisURIAndApplySettings(getHostName(), getPort()); RedisClient redisClient = clientConfiguration.getClientResources() .map(clientResources -> RedisClient.create(clientResources, uri)) .orElseGet(() -> RedisClient.create(uri)); clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions); return redisClient; }
首先创建RedisClient客户端
之后创建LettuceConnectionProvider,这里使用了装饰器模式:StandaloneConnectionProvider->LettucePoolingConnectionProvider->ExceptionTranslatingConnectionProvider,从名称上也能大概看出装-
StandaloneConnectionProvider:独立的获取redis连接,底层使用Netty框架。
LettucePoolingConnectionProvider:redis连接池化技术,使用org.apache.commons.pool2.impl.GenericObjectPool
ExceptionTranslatingConnectionProvider:Redis连接异常转换。
2、Redis数据存取
SpringRedisGenericCache key-val 结构数据存储,支持key过期时间控制。
SpringRedisCache hash结构数据存储。
以SpringRedisGenericCache中setBytes方法为例分析RedisTemplate操作redis原理。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void setBytes (String key, byte [] bytes, long timeToLiveInSeconds) { if (timeToLiveInSeconds <= 0 ) { log.debug(String.format("Invalid timeToLiveInSeconds value : %d , skipped it." , timeToLiveInSeconds)); setBytes(key, bytes); } else { redisTemplate.execute((RedisCallback<List<byte []>>) redis -> { redis.setEx(_key(key), (int ) timeToLiveInSeconds, bytes); return null ; }); } }
从源码中可以看到,RedisTemplate的execute方法入参为RedisCallback回调接口,主要用户获取到RedisConnection之后,再执行操作redis的底层代码。RedisTemplate的这种设计,将redis连接的获取与释放对使用者隐藏,redis的具体操作对使用者开放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override @Nullable public <T> T execute (RedisCallback<T> action) { return execute(action, isExposeConnection()); } @Nullable public <T> T execute (RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it" ); Assert.notNull(action, "Callback object must not be null" ); RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport); try { boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if (pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); T result = action.doInRedis(connToExpose); if (pipeline && !pipelineStatus) { connToUse.closePipeline(); } return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport); } }
使用模板设计模式与回调机制实现了操作redis的整个流程:
获取RedisConnectionFactory,测试时,这里为LettuceConnectionFactory
获取RedisConnection,底层就是LettuceConnectionFactory.getConnection()
回调方法,具体的redis方法由使用者控制,这里将RedisConnection通过参数传给使用者
释放RedisConnection
2.1、获取RedisConnection 2.1.1、Spring提供的redis连接工厂 RedisConnectionFactory–> ExceptionTranslatingConnectionProvider–>LettucePoolingConnectionProvider–>StandaloneConnectionProvider
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 private StatefulConnection<E, E> getNativeConnection () { return connectionProvider.getConnection(StatefulConnection.class); } @Override public <T extends StatefulConnection <?, ?>> T getConnection (Class<T> connectionType) { try { return delegate.getConnection(connectionType); } catch (RuntimeException e) { throw translateException(e); } } private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap <>(32 );private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap <>(32 );@Override public <T extends StatefulConnection <?, ?>> T getConnection (Class<T> connectionType) { GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> { return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType), poolConfig, false ); }); try { StatefulConnection<?, ?> connection = pool.borrowObject(); poolRef.put(connection, pool); return connectionType.cast(connection); } catch (Exception e) { throw new PoolException ("Could not get a resource from the pool" , e); } } @Override public <T extends StatefulConnection <?, ?>> CompletionStage<T> getConnectionAsync (Class<T> connectionType) { return getConnectionAsync(connectionType, redisURISupplier.get()); } @Override public <T extends StatefulConnection <?, ?>> T getConnection (Class<T> connectionType) { if (connectionType.equals(StatefulRedisSentinelConnection.class)) { return connectionType.cast(client.connectSentinel()); } if (connectionType.equals(StatefulRedisPubSubConnection.class)) { return connectionType.cast(client.connectPubSub(codec)); } if (StatefulConnection.class.isAssignableFrom(connectionType)) { return connectionType.cast(readFrom.map(it -> this .masterReplicaConnection(redisURISupplier.get(), it)) .orElseGet(() -> client.connect(codec))); } throw new UnsupportedOperationException ("Connection type " + connectionType + " not supported!" ); }
2.1.2、Lettuce创建连接 阅读创建Redis连接的代码有以下几个难点:
CompletableFuture的使用
Lettuce的EventRecorder
Reactor的Mono使用
Netty的使用
2.1.2.1、入口connect 1 2 3 4 5 6 public <K, V> StatefulRedisConnection<K, V> connect (RedisCodec<K, V> codec, RedisURI redisURI) { assertNotNull(redisURI); return getConnection(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout())); }
依次通过connectStandaloneAsync->connectStatefulAsync->initializeChannelAsync->initializeChannelAsync0等方法找到Netty连接Redis源码。
2.1.2.2、connectStandaloneAsync 主要创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync (RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { assertNotNull(codec); checkValidRedisURI(redisURI); logger.debug("Trying to get a Redis connection for: {}" , redisURI); DefaultEndpoint endpoint = new DefaultEndpoint (getOptions(), getResources()); RedisChannelWriter writer = endpoint; if (CommandExpiryWriter.isSupported(getOptions())) { writer = new CommandExpiryWriter (writer, getOptions(), getResources()); } if (CommandListenerWriter.isSupported(getCommandListeners())) { writer = new CommandListenerWriter (writer, getCommandListeners()); } StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout); ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler (getOptions(), getResources(), endpoint)); future.whenComplete((channelHandler, throwable) -> { if (throwable != null ) { connection.closeAsync(); } }); return future; }
2.1.2.3、connectStatefulAsync 构建ConnectionBuilder,通过ConnectionBuilder来创建connection。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @SuppressWarnings("unchecked") private <K, V, S> ConnectionFuture<S> connectStatefulAsync (StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) { ConnectionBuilder connectionBuilder; if (redisURI.isSsl()) { SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder(); sslConnectionBuilder.ssl(redisURI); connectionBuilder = sslConnectionBuilder; } else { connectionBuilder = ConnectionBuilder.connectionBuilder(); } ConnectionState state = connection.getConnectionState(); state.apply(redisURI); state.setDb(redisURI.getDatabase()); connectionBuilder.connection(connection); connectionBuilder.clientOptions(getOptions()); connectionBuilder.clientResources(getResources()); connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint); connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI); connectionBuilder.connectionInitializer(createHandshake(state)); ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder); return future.thenApply(channelHandler -> (S) connection); }
getSocketAddressSupplier方法中,通过Mono.defer懒汉方式创建 Mono对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private Mono<SocketAddress> getSocketAddressSupplier (RedisURI redisURI) { return getSocketAddress(redisURI).doOnNext(addr -> logger.debug("Resolved SocketAddress {} using {}" , addr, redisURI)); } protected Mono<SocketAddress> getSocketAddress (RedisURI redisURI) { return Mono.defer(() -> { if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) { logger.debug("Connecting to Redis using Sentinels {}, MasterId {}" , redisURI.getSentinels(), redisURI.getSentinelMasterId()); return lookupRedis(redisURI).switchIfEmpty(Mono.error(new RedisConnectionException ( "Cannot provide redisAddress using sentinel for masterId " + redisURI.getSentinelMasterId()))); } else { return Mono.fromCallable(() -> getResources().socketAddressResolver().resolve((redisURI))); } }); }
connectionBuilder方法中创建了Netty客户端的Bootstrap对象,并配置Netty的各种参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 protected void connectionBuilder (Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder, ConnectionEvents connectionEvents, RedisURI redisURI) { Bootstrap redisBootstrap = new Bootstrap (); redisBootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); connectionBuilder.bootstrap(redisBootstrap); connectionBuilder.apply(redisURI); connectionBuilder.configureBootstrap(!LettuceStrings.isEmpty(redisURI.getSocket()), this ::getEventLoopGroup); connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents == this .connectionEvents ? connectionEvents : ConnectionEvents.of(this .connectionEvents, connectionEvents)); connectionBuilder.socketAddressSupplier(socketAddressSupplier); }
2.1.2.4、initializeChannelAsync SocketAddress及Channel对象的异步处理,最终调用真正获取物理连接的方法initializeChannelAsync0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 protected <K, V, T extends RedisChannelHandler <K, V>> ConnectionFuture<T> initializeChannelAsync ( ConnectionBuilder connectionBuilder) { Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress(); if (clientResources.eventExecutorGroup().isShuttingDown()) { throw new IllegalStateException ("Cannot connect, Event executor group is terminated." ); } CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture <>(); CompletableFuture<Channel> channelReadyFuture = new CompletableFuture <>(); String uriString = connectionBuilder.getRedisURI().toString(); EventRecorder.getInstance().record( new ConnectionCreatedEvent (uriString, connectionBuilder.endpoint().getId())); EventRecorder.RecordableEvent event = EventRecorder.getInstance() .start(new ConnectEvent (uriString, connectionBuilder.endpoint().getId())); channelReadyFuture.whenComplete((channel, throwable) -> { event.record(); }); socketAddressSupplier .doOnError(socketAddressFuture::completeExceptionally) .doOnNext(socketAddressFuture::complete) .subscribe(redisAddress -> { if (channelReadyFuture.isCancelled()) { return ; } initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress); }, channelReadyFuture::completeExceptionally); return new DefaultConnectionFuture <>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder.connection())); }
2.1.2.5、initializeChannelAsync0 操作Netty创建连接,大量使用了CompletableFuture等异步编程方式,代码看着比较吃力。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 private void initializeChannelAsync0 (ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture, SocketAddress redisAddress) { logger.debug("Connecting to Redis at {}" , redisAddress); Bootstrap redisBootstrap = connectionBuilder.bootstrap(); ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress); redisBootstrap.handler(initializer); clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap); ChannelFuture connectFuture = redisBootstrap.connect(redisAddress); channelReadyFuture.whenComplete((c, t) -> { if (t instanceof CancellationException) { connectFuture.cancel(true ); } }); connectFuture.addListener(future -> { if (!future.isSuccess()) { logger.debug("Connecting to Redis at {}: {}" , redisAddress, future.cause()); connectionBuilder.endpoint().initialState(); channelReadyFuture.completeExceptionally(future.cause()); return ; } RedisHandshakeHandler handshakeHandler = connectFuture.channel().pipeline().get(RedisHandshakeHandler.class); if (handshakeHandler == null ) { channelReadyFuture.completeExceptionally(new IllegalStateException ("RedisHandshakeHandler not registered" )); return ; } handshakeHandler.channelInitialized().whenComplete((success, throwable) -> { if (throwable == null ) { logger.debug("Connecting to Redis at {}: Success" , redisAddress); RedisChannelHandler<?, ?> connection = connectionBuilder.connection(); connection.registerCloseables(closeableResources, connection); channelReadyFuture.complete(connectFuture.channel()); return ; } logger.debug("Connecting to Redis at {}, initialization: {}" , redisAddress, throwable); connectionBuilder.endpoint().initialState(); channelReadyFuture.completeExceptionally(throwable); }); }); }
2.2、redis操作 以插入数据为例,分析下spring通过lettuce操作redis。
1 2 3 4 RedisConnection connection = connectionFactory.getConnection();connection.set("key" .getBytes(StandardCharsets.UTF_8), "123" .getBytes(StandardCharsets.UTF_8));
这里的connection为:org.springframework.data.redis.connection.lettuce.LettuceConnection
1 2 3 4 5 6 7 8 public interface RedisConnection extends RedisCommands , AutoCloseable {public interface RedisCommands extends RedisKeyCommands , RedisStringCommands, RedisListCommands, RedisSetCommands, RedisZSetCommands, RedisHashCommands, RedisTxCommands, RedisPubSubCommands, RedisConnectionCommands, RedisServerCommands, RedisStreamCommands, RedisScriptingCommands, RedisGeoCommands, RedisHyperLogLogCommands { @Nullable Object execute (String command, byte []... args) ; }
通过debug,依次经过以下方法:
org.springframework.data.redis.connection.DefaultedRedisConnection#set(byte[], byte[])
org.springframework.data.redis.connection.lettuce.LettuceConnection#stringCommands
org.springframework.data.redis.connection.lettuce.LettuceStringCommands#set(byte[], byte[])
2.2.1、Spring提供的LettuceInvoker,根据异步连接操作Lettuce的方法 1 2 3 4 5 6 7 @Override public Boolean set (byte [] key, byte [] value) { return connection.invoke().from(RedisStringAsyncCommands::set, key, value) .get(Converters.stringToBooleanConverter()); }
2.2.1.1、LettuceConnection#invoke()方法返回的LettuceInvoker对象 用户Lettuce方法的函数调用. 通常用于将方法调用表示为方法引用,并通过just或from方法之一传递方法参数。
ust方法记录方法调用并立即评估方法结果。
from方法允许组成一个pipeline ,使用转换器转换结果。
Usage example:
LettuceInvoker invoker = …;
Long result = invoker.just(RedisGeoAsyncCommands::geoadd, key, point.getX(), point.getY(), member);
List<byte[]> result = invoker.fromMany(RedisGeoAsyncCommands::geohash, key, members).toList(it -> it.getValueOrElse(null));
RedisFuture委托给LettuceInvoker管理。
Synchronizer可以等待完成,也可以记录转换器的future,以便进一步处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 LettuceInvoker invoke () { return invoke(getAsyncConnection()); } LettuceInvoker invoke (RedisClusterAsyncCommands<byte [], byte []> connection) { return doInvoke(connection, false ); } private LettuceInvoker doInvoke (RedisClusterAsyncCommands<byte [], byte []> connection, boolean statusCommand) { return new LettuceInvoker (connection, (future, converter, nullDefault) -> { try { Object result = await(future.get()); if (result == null ) { return nullDefault.get(); } return converter.convert(result); } catch (Exception ex) { throw convertLettuceAccessException(ex); } }); } @Nullable private <T> T await (RedisFuture<T> cmd) { if (isMulti) { return null ; } try { return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); } catch (RuntimeException e) { throw convertLettuceAccessException(e); } }
getAsyncConnection获取异步连接
在分析获取RedisConnection的时候,了解到StatefulRedisConnectionImpl中维护了RedisAsyncCommandsImpl的实现对象。
这里异步获取连接的async属性(RedisAsyncCommandsImpl实现对象)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 RedisClusterAsyncCommands<byte [], byte []> getAsyncConnection() { if (isQueueing() || isPipelined()) { return getAsyncDedicatedConnection(); } if (asyncSharedConn != null ) { if (asyncSharedConn instanceof StatefulRedisConnection) { return ((StatefulRedisConnection<byte [], byte []>) asyncSharedConn).async(); } if (asyncSharedConn instanceof StatefulRedisClusterConnection) { return ((StatefulRedisClusterConnection<byte [], byte []>) asyncSharedConn).async(); } } return getAsyncDedicatedConnection(); }
async对象设置与获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public RedisAsyncCommands<K, V> async () { return async; } public StatefulRedisConnectionImpl (RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout) { super (writer, timeout); this .pushHandler = pushHandler; this .codec = codec; this .async = newRedisAsyncCommandsImpl(); this .sync = newRedisSyncCommandsImpl(); this .reactive = newRedisReactiveCommandsImpl(); }
2.2.1.2、LettuceInvoker的from方法 这里阅读源码的难点在于:@FunctionalInterface函数式接口和lambda表达式。
了解两个前提:
创建redis连接的时候,在io.lettuce.core.RedisClient#connectStandaloneAsync方法中调用了newStatefulRedisConnection方法,最终new了一个StatefulRedisConnectionImpl,注意其中的async属性。
上个小节中LettuceInvoker中的connection属性就是这个async,即RedisAsyncCommandsImpl对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public Boolean set (byte [] key, byte [] value) { LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = new LettuceInvoker .ConnectionFunction2<byte [], byte [], String>() { @Override public RedisFuture<String> apply (RedisClusterAsyncCommands<byte [], byte []> connection, byte [] key, byte [] value) { return connection.set(key,value); } }; LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = (connection, key1, value1) -> connection.set(key1, value1); LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = RedisStringAsyncCommands::set; return connection.invoke().from(function, key, value) .get(Converters.stringToBooleanConverter()); } @FunctionalInterface interface ConnectionFunction2 <T1, T2, R> { RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection, T1 t1, T2 t2) ; } <R, T1, T2> SingleInvocationSpec<R> from (ConnectionFunction2<T1, T2, R> function, T1 t1, T2 t2) { Assert.notNull(function, "ConnectionFunction must not be null" ); ConnectionFunction0<R> function2 = new ConnectionFunction0 <R>() { @Override public RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection) { return function.apply(connection, t1, t2); } }; ConnectionFunction0<R> function2 = connection -> function.apply(connection, t1, t2); return from(function2); } @FunctionalInterface interface ConnectionFunction0 <R> { RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection) ; } <R> SingleInvocationSpec<R> from (ConnectionFunction0<R> function) { Assert.notNull(function, "ConnectionFunction must not be null" ); Supplier<RedisFuture<R>> parent = new Supplier <RedisFuture<R>>() { @Override public RedisFuture<R> get () { return function.apply(connection); } }; Supplier<RedisFuture<R>> parent = () -> function.apply(connection); return new DefaultSingleInvocationSpec <>(parent, synchronizer); }
从代码中可以看出,Spring的LettuceInvoker对RedisClusterAsyncCommands做了一个编排,组装好DefaultSingleInvocationSpec对象,最后在调用SingleInvocationSpec的get方法。
2.2.1.3、SingleInvocationSpec的get方法 1 2 3 4 5 6 @Override public <T> T get (Converter<S, T> converter) { Assert.notNull(converter, "Converter must not be null" ); return synchronizer.invoke(parent, converter, () -> null ); }
converter为Converters.stringToBooleanConverter(),将返回的“OK”字符串转Boolean true
parent为RedisFuture
synchronizer为LettuceInvoker.Synchronizer,在2.2.1.1小节能看到:new LettuceInvoker的方法中,传入了synchronizer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 LettuceInvoker.Synchronizer synchronizer = new LettuceInvoker .Synchronizer() { @Override public Object doInvoke (Supplier<RedisFuture<Object>> futureSupplier, Converter<Object, Object> converter, Supplier<Object> nullDefault) { try { Object result = await(futureSupplier.get()); if (result == null ) { return nullDefault.get(); } return converter.convert(result); } catch (Exception ex) { throw convertLettuceAccessException(ex); } } }; @Nullable private <T> T await (RedisFuture<T> cmd) { if (isMulti) { return null ; } try { return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); } catch (RuntimeException e) { throw convertLettuceAccessException(e); } }
通过执行futureSupplier.get(),触发RedisStringAsyncCommands#set(K, V)方法
再执行await后,等待timeout时间,从RedisFuture获取结果
至此终于到了核心执行redis命令的地方。
2.2.2、RedisStringAsyncCommands的set方法执行流程 2.3、释放RedisConnection 3、Redis 发布订阅
通过SpringRedisPubSubPolicy注册对redis中key-val过期和删除监听
SpringRedisPubSubPolicy#publish 发布消息
SpringRedisMessageListener 订阅消息监听
参考 https://github.com/lettuce-io/lettuce-core/
https://github.com/spring-projects/spring-data-redis
https://www.cnblogs.com/sinoknots/p/15984465.html
https://blog.csdn.net/qq_38390126/article/details/104256912
https://www.jianshu.com/p/9531dd70b394
https://cloud.tencent.com/developer/article/1683498
https://segmentfault.com/a/1190000041391240
https://www.jianshu.com/p/ab80bb3b5415
https://www.jianshu.com/p/52e988455d07
https://www.jianshu.com/p/fff27097bd81
https://blog.csdn.net/winterking3/article/details/116477522
https://blog.csdn.net/winterking3/article/details/116025829?spm=1001.2014.3001.5502
https://blog.csdn.net/singgel/article/details/105552962
https://blog.csdn.net/weixin_42260270/article/details/117749453