从三个方向学习:
系统启动,加载J2CacheSpringRedisAutoConfiguration配置
 
J2Cache缓存集群策略SpringRedisPubSubPolicy
 
二级缓存操作:SpringRedisProvider以及SpringRedisGenericCache、SpringRedisCache
 
 
1、RedisTemplate对象创建 
J2CacheSpringRedisAutoConfiguration配置文件在RedisAutoConfiguration之后,J2CacheAutoConfiguration之前加载。 
通过j2cache.redis-client可以配置redis客户端:Jedis或则Lettuce,本次学习使用Lettuce。 
使用RedisStandaloneConfiguration构建LettuceConnectionFactory对象。 
最后将RedisTemplate的对象交给Spring管理。 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Bean("j2CacheRedisTemplate") public  RedisTemplate<String, Serializable> j2CacheRedisTemplate (   @Qualifier("j2CahceRedisConnectionFactory")  RedisConnectionFactory j2CahceRedisConnectionFactory,   @Qualifier("j2CacheValueSerializer")  RedisSerializer<Object> j2CacheSerializer)  {  RedisTemplate<String, Serializable> template = new  RedisTemplate <String, Serializable>();   template.setKeySerializer(new  StringRedisSerializer ());   template.setHashKeySerializer(new  StringRedisSerializer ());   template.setDefaultSerializer(j2CacheSerializer);   template.setConnectionFactory(j2CahceRedisConnectionFactory);   return  template; } @Primary @Bean("j2CahceRedisConnectionFactory") public  LettuceConnectionFactory lettuceConnectionFactory (net.oschina.j2cache.J2CacheConfig j2CacheConfig)  {     LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder  config  =  LettucePoolingClientConfiguration.builder();   config.commandTimeout(Duration.ofMillis(5000 ));   config.poolConfig(getGenericRedisPool());   RedisStandaloneConfiguration  single  =  new  RedisStandaloneConfiguration ("xxxx" , 6379 );   single.setDatabase(10 );   single.setPassword("****" );   LettuceConnectionFactory  connectionFactory  =  new  LettuceConnectionFactory (single, config.build());   return  connectionFactory; } 
 
再看RedisTemplate类:
RedisTemplate继承了RedisAccessor类,实现RedisOperations和BeanClassLoaderAware接口,其中RedisAccessor维护了redis连接工厂,RedisOperations定义redis相关操作。
这里的connectionFactory为LettuceConnectionFactory:操作Redis的准备工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private  @Nullable  AbstractRedisClient client;private  @Nullable  LettuceConnectionProvider connectionProvider;private  @Nullable  LettuceConnectionProvider reactiveConnectionProvider;public  void  afterPropertiesSet ()  {     this .client = createClient();         this .connectionProvider = new  ExceptionTranslatingConnectionProvider (createConnectionProvider(client, CODEC));            this .reactiveConnectionProvider = new  ExceptionTranslatingConnectionProvider (     createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));       } protected  AbstractRedisClient createClient ()  {     RedisURI  uri  =  isDomainSocketAware()     ? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket())     : createRedisURIAndApplySettings(getHostName(), getPort());         RedisClient  redisClient  =  clientConfiguration.getClientResources()      .map(clientResources -> RedisClient.create(clientResources, uri))      .orElseGet(() -> RedisClient.create(uri));   clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);      return  redisClient; } 
 
首先创建RedisClient客户端
 
之后创建LettuceConnectionProvider,这里使用了装饰器模式:StandaloneConnectionProvider->LettucePoolingConnectionProvider->ExceptionTranslatingConnectionProvider,从名称上也能大概看出装-
 
StandaloneConnectionProvider:独立的获取redis连接,底层使用Netty框架。 
 
 
LettucePoolingConnectionProvider:redis连接池化技术,使用org.apache.commons.pool2.impl.GenericObjectPool
 
ExceptionTranslatingConnectionProvider:Redis连接异常转换。
 
 
2、Redis数据存取 
SpringRedisGenericCache key-val 结构数据存储,支持key过期时间控制。 
SpringRedisCache  hash结构数据存储。 
 
以SpringRedisGenericCache中setBytes方法为例分析RedisTemplate操作redis原理。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public  void  setBytes (String key, byte [] bytes, long  timeToLiveInSeconds)  {  if  (timeToLiveInSeconds <= 0 ) {     log.debug(String.format("Invalid timeToLiveInSeconds value : %d , skipped it." , timeToLiveInSeconds));     setBytes(key, bytes);   } else  {          redisTemplate.execute((RedisCallback<List<byte []>>) redis -> {       redis.setEx(_key(key), (int ) timeToLiveInSeconds, bytes);       return  null ;     });   } } 
 
从源码中可以看到,RedisTemplate的execute方法入参为RedisCallback回调接口,主要用户获取到RedisConnection之后,再执行操作redis的底层代码。RedisTemplate的这种设计,将redis连接的获取与释放对使用者隐藏,redis的具体操作对使用者开放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override @Nullable public  <T> T execute (RedisCallback<T> action)  {  return  execute(action, isExposeConnection()); } @Nullable public  <T> T execute (RedisCallback<T> action, boolean  exposeConnection, boolean  pipeline)  {     Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it" );   Assert.notNull(action, "Callback object must not be null" );      RedisConnectionFactory  factory  =  getRequiredConnectionFactory();      RedisConnection  conn  =  RedisConnectionUtils.getConnection(factory, enableTransactionSupport);      try  {          boolean  existingConnection  =  TransactionSynchronizationManager.hasResource(factory);     RedisConnection  connToUse  =  preProcessConnection(conn, existingConnection);          boolean  pipelineStatus  =  connToUse.isPipelined();     if  (pipeline && !pipelineStatus) {       connToUse.openPipeline();     }          RedisConnection  connToExpose  =  (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));          T  result  =  action.doInRedis(connToExpose);               if  (pipeline && !pipelineStatus) {       connToUse.closePipeline();     }          return  postProcessResult(result, connToUse, existingConnection);   } finally  {          RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);   } } 
 
使用模板设计模式与回调机制实现了操作redis的整个流程:
获取RedisConnectionFactory,测试时,这里为LettuceConnectionFactory 
获取RedisConnection,底层就是LettuceConnectionFactory.getConnection() 
回调方法,具体的redis方法由使用者控制,这里将RedisConnection通过参数传给使用者 
释放RedisConnection 
 
2.1、获取RedisConnection 2.1.1、Spring提供的redis连接工厂 RedisConnectionFactory–> ExceptionTranslatingConnectionProvider–>LettucePoolingConnectionProvider–>StandaloneConnectionProvider
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 private  StatefulConnection<E, E> getNativeConnection ()  {     return  connectionProvider.getConnection(StatefulConnection.class); } @Override public  <T extends  StatefulConnection <?, ?>> T getConnection (Class<T> connectionType)  {  try  {          return  delegate.getConnection(connectionType);   } catch  (RuntimeException e) {          throw  translateException(e);   } } private  final  Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new  ConcurrentHashMap <>(32 );private  final  Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new  ConcurrentHashMap <>(32 );@Override public  <T extends  StatefulConnection <?, ?>> T getConnection (Class<T> connectionType)  {     GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {          return  ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),                                                          poolConfig, false );   });      try  {     StatefulConnection<?, ?> connection = pool.borrowObject();     poolRef.put(connection, pool);     return  connectionType.cast(connection);   } catch  (Exception e) {     throw  new  PoolException ("Could not get a resource from the pool" , e);   } } @Override public  <T extends  StatefulConnection <?, ?>> CompletionStage<T> getConnectionAsync (Class<T> connectionType)  {  return  getConnectionAsync(connectionType, redisURISupplier.get()); } @Override public  <T extends  StatefulConnection <?, ?>> T getConnection (Class<T> connectionType)  {     if  (connectionType.equals(StatefulRedisSentinelConnection.class)) {     return  connectionType.cast(client.connectSentinel());   }      if  (connectionType.equals(StatefulRedisPubSubConnection.class)) {     return  connectionType.cast(client.connectPubSub(codec));   }      if  (StatefulConnection.class.isAssignableFrom(connectionType)) {          return  connectionType.cast(readFrom.map(it -> this .masterReplicaConnection(redisURISupplier.get(), it))                                .orElseGet(() -> client.connect(codec)));   }      throw  new  UnsupportedOperationException ("Connection type "  + connectionType + " not supported!" ); } 
 
2.1.2、Lettuce创建连接 阅读创建Redis连接的代码有以下几个难点:
CompletableFuture的使用 
Lettuce的EventRecorder 
Reactor的Mono使用 
Netty的使用 
 
2.1.2.1、入口connect 1 2 3 4 5 6 public  <K, V> StatefulRedisConnection<K, V> connect (RedisCodec<K, V> codec, RedisURI redisURI)  {  assertNotNull(redisURI);             return  getConnection(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout())); } 
 
依次通过connectStandaloneAsync->connectStatefulAsync->initializeChannelAsync->initializeChannelAsync0等方法找到Netty连接Redis源码。
2.1.2.2、connectStandaloneAsync 主要创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private  <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync (RedisCodec<K, V> codec,                                                                                       RedisURI redisURI, Duration timeout)  {     assertNotNull(codec);   checkValidRedisURI(redisURI);      logger.debug("Trying to get a Redis connection for: {}" , redisURI);            DefaultEndpoint  endpoint  =  new  DefaultEndpoint (getOptions(), getResources());   RedisChannelWriter  writer  =  endpoint;         if  (CommandExpiryWriter.isSupported(getOptions())) {     writer = new  CommandExpiryWriter (writer, getOptions(), getResources());   }         if  (CommandListenerWriter.isSupported(getCommandListeners())) {     writer = new  CommandListenerWriter (writer, getCommandListeners());   }                  StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);         ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,                                                                                 () -> new  CommandHandler (getOptions(), getResources(), endpoint));         future.whenComplete((channelHandler, throwable) -> {               if  (throwable != null ) {              connection.closeAsync();     }   });      return  future; } 
 
2.1.2.3、connectStatefulAsync 构建ConnectionBuilder,通过ConnectionBuilder来创建connection。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @SuppressWarnings("unchecked") private  <K, V, S> ConnectionFuture<S> connectStatefulAsync (StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint,                                                            RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier)  {     ConnectionBuilder connectionBuilder;   if  (redisURI.isSsl()) {     SslConnectionBuilder  sslConnectionBuilder  =  SslConnectionBuilder.sslConnectionBuilder();     sslConnectionBuilder.ssl(redisURI);     connectionBuilder = sslConnectionBuilder;   } else  {     connectionBuilder = ConnectionBuilder.connectionBuilder();   }      ConnectionState  state  =  connection.getConnectionState();   state.apply(redisURI);   state.setDb(redisURI.getDatabase());         connectionBuilder.connection(connection);      connectionBuilder.clientOptions(getOptions());      connectionBuilder.clientResources(getResources());      connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);            connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI);      connectionBuilder.connectionInitializer(createHandshake(state));      ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);         return  future.thenApply(channelHandler -> (S) connection); } 
 
getSocketAddressSupplier方法中,通过Mono.defer懒汉方式创建 Mono对象。 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private  Mono<SocketAddress> getSocketAddressSupplier (RedisURI redisURI)  {  return  getSocketAddress(redisURI).doOnNext(addr -> logger.debug("Resolved SocketAddress {} using {}" , addr, redisURI)); } protected  Mono<SocketAddress> getSocketAddress (RedisURI redisURI)  {     return  Mono.defer(() -> {          if  (redisURI.getSentinelMasterId() != null  && !redisURI.getSentinels().isEmpty()) {       logger.debug("Connecting to Redis using Sentinels {}, MasterId {}" , redisURI.getSentinels(),                    redisURI.getSentinelMasterId());       return  lookupRedis(redisURI).switchIfEmpty(Mono.error(new  RedisConnectionException (         "Cannot provide redisAddress using sentinel for masterId "  + redisURI.getSentinelMasterId())));            } else  {       return  Mono.fromCallable(() -> getResources().socketAddressResolver().resolve((redisURI)));     }   }); } 
 
connectionBuilder方法中创建了Netty客户端的Bootstrap对象,并配置Netty的各种参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 protected  void  connectionBuilder (Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,                                  ConnectionEvents connectionEvents, RedisURI redisURI)  {     Bootstrap  redisBootstrap  =  new  Bootstrap ();      redisBootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);   connectionBuilder.bootstrap(redisBootstrap);      connectionBuilder.apply(redisURI);      connectionBuilder.configureBootstrap(!LettuceStrings.isEmpty(redisURI.getSocket()), this ::getEventLoopGroup);      connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents == this .connectionEvents ? connectionEvents                                                             : ConnectionEvents.of(this .connectionEvents, connectionEvents));      connectionBuilder.socketAddressSupplier(socketAddressSupplier); } 
 
2.1.2.4、initializeChannelAsync SocketAddress及Channel对象的异步处理,最终调用真正获取物理连接的方法initializeChannelAsync0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 protected  <K, V, T extends  RedisChannelHandler <K, V>> ConnectionFuture<T> initializeChannelAsync (   ConnectionBuilder connectionBuilder)  {        Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();      if  (clientResources.eventExecutorGroup().isShuttingDown()) {     throw  new  IllegalStateException ("Cannot connect, Event executor group is terminated." );   }            CompletableFuture<SocketAddress> socketAddressFuture = new  CompletableFuture <>();      CompletableFuture<Channel> channelReadyFuture = new  CompletableFuture <>();      String  uriString  =  connectionBuilder.getRedisURI().toString();         EventRecorder.getInstance().record(     new  ConnectionCreatedEvent (uriString, connectionBuilder.endpoint().getId()));      EventRecorder.RecordableEvent  event  =  EventRecorder.getInstance()     .start(new  ConnectEvent (uriString, connectionBuilder.endpoint().getId()));         channelReadyFuture.whenComplete((channel, throwable) -> {          event.record();   });               socketAddressSupplier          .doOnError(socketAddressFuture::completeExceptionally)          .doOnNext(socketAddressFuture::complete)          .subscribe(redisAddress -> {              if  (channelReadyFuture.isCancelled()) {         return ;       }              initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);     }, channelReadyFuture::completeExceptionally);      return  new  DefaultConnectionFuture <>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder.connection())); } 
 
2.1.2.5、initializeChannelAsync0 操作Netty创建连接,大量使用了CompletableFuture等异步编程方式,代码看着比较吃力。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 private  void  initializeChannelAsync0 (ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,                                      SocketAddress redisAddress)  {     logger.debug("Connecting to Redis at {}" , redisAddress);      Bootstrap  redisBootstrap  =  connectionBuilder.bootstrap();         ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress);      redisBootstrap.handler(initializer);      clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);      ChannelFuture  connectFuture  =  redisBootstrap.connect(redisAddress);         channelReadyFuture.whenComplete((c, t) -> {          if  (t instanceof  CancellationException) {              connectFuture.cancel(true );     }   });         connectFuture.addListener(future -> {          if  (!future.isSuccess()) {       logger.debug("Connecting to Redis at {}: {}" , redisAddress, future.cause());              connectionBuilder.endpoint().initialState();              channelReadyFuture.completeExceptionally(future.cause());       return ;     }               RedisHandshakeHandler  handshakeHandler  =  connectFuture.channel().pipeline().get(RedisHandshakeHandler.class);     if  (handshakeHandler == null ) {              channelReadyFuture.completeExceptionally(new  IllegalStateException ("RedisHandshakeHandler not registered" ));       return ;     }                    handshakeHandler.channelInitialized().whenComplete((success, throwable) -> {              if  (throwable == null ) {         logger.debug("Connecting to Redis at {}: Success" , redisAddress);         RedisChannelHandler<?, ?> connection = connectionBuilder.connection();         connection.registerCloseables(closeableResources, connection);                  channelReadyFuture.complete(connectFuture.channel());         return ;       }              logger.debug("Connecting to Redis at {}, initialization: {}" , redisAddress, throwable);              connectionBuilder.endpoint().initialState();              channelReadyFuture.completeExceptionally(throwable);     });   }); } 
 
2.2、redis操作 以插入数据为例,分析下spring通过lettuce操作redis。
1 2 3 4 RedisConnection  connection  =  connectionFactory.getConnection();connection.set("key" .getBytes(StandardCharsets.UTF_8), "123" .getBytes(StandardCharsets.UTF_8)); 
 
这里的connection为:org.springframework.data.redis.connection.lettuce.LettuceConnection
1 2 3 4 5 6 7 8 public  interface  RedisConnection  extends  RedisCommands , AutoCloseable {public  interface  RedisCommands  extends  RedisKeyCommands , RedisStringCommands, RedisListCommands, RedisSetCommands,  RedisZSetCommands, RedisHashCommands, RedisTxCommands, RedisPubSubCommands, RedisConnectionCommands,   RedisServerCommands, RedisStreamCommands, RedisScriptingCommands, RedisGeoCommands, RedisHyperLogLogCommands {    @Nullable      Object execute (String command, byte []... args) ; } 
 
通过debug,依次经过以下方法:
org.springframework.data.redis.connection.DefaultedRedisConnection#set(byte[], byte[]) 
org.springframework.data.redis.connection.lettuce.LettuceConnection#stringCommands 
org.springframework.data.redis.connection.lettuce.LettuceStringCommands#set(byte[], byte[]) 
 
2.2.1、Spring提供的LettuceInvoker,根据异步连接操作Lettuce的方法 1 2 3 4 5 6 7 @Override public  Boolean set (byte [] key, byte [] value)  {        return  connection.invoke().from(RedisStringAsyncCommands::set, key, value)     .get(Converters.stringToBooleanConverter()); } 
 
2.2.1.1、LettuceConnection#invoke()方法返回的LettuceInvoker对象 用户Lettuce方法的函数调用. 通常用于将方法调用表示为方法引用,并通过just或from方法之一传递方法参数。
ust方法记录方法调用并立即评估方法结果。
from方法允许组成一个pipeline ,使用转换器转换结果。
Usage example:
LettuceInvoker invoker = …;
Long result = invoker.just(RedisGeoAsyncCommands::geoadd, key, point.getX(), point.getY(), member);
List<byte[]> result = invoker.fromMany(RedisGeoAsyncCommands::geohash, key, members).toList(it -> it.getValueOrElse(null));
RedisFuture委托给LettuceInvoker管理。
Synchronizer可以等待完成,也可以记录转换器的future,以便进一步处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 LettuceInvoker invoke ()  {   return  invoke(getAsyncConnection()); } LettuceInvoker invoke (RedisClusterAsyncCommands<byte [], byte []> connection)  {   return  doInvoke(connection, false ); } private  LettuceInvoker doInvoke (RedisClusterAsyncCommands<byte [], byte []> connection, boolean  statusCommand)  {     return  new  LettuceInvoker (connection, (future, converter, nullDefault) -> {     try  {       Object  result  =  await(future.get());       if  (result == null ) {         return  nullDefault.get();       }       return  converter.convert(result);     } catch  (Exception ex) {       throw  convertLettuceAccessException(ex);     }   }); } 	@Nullable  private  <T> T await (RedisFuture<T> cmd)  {		if  (isMulti) { 			return  null ; 		} 		try  { 			return  LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); 		} catch  (RuntimeException e) { 			throw  convertLettuceAccessException(e); 		} 	} 
 
getAsyncConnection获取异步连接
在分析获取RedisConnection的时候,了解到StatefulRedisConnectionImpl中维护了RedisAsyncCommandsImpl的实现对象。 
这里异步获取连接的async属性(RedisAsyncCommandsImpl实现对象) 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 RedisClusterAsyncCommands<byte [], byte []> getAsyncConnection() {   if  (isQueueing() || isPipelined()) {     return  getAsyncDedicatedConnection();   }   if  (asyncSharedConn != null ) {     if  (asyncSharedConn instanceof  StatefulRedisConnection) {       return  ((StatefulRedisConnection<byte [], byte []>) asyncSharedConn).async();     }     if  (asyncSharedConn instanceof  StatefulRedisClusterConnection) {       return  ((StatefulRedisClusterConnection<byte [], byte []>) asyncSharedConn).async();     }   }   return  getAsyncDedicatedConnection(); } 
 
async对象设置与获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public  RedisAsyncCommands<K, V> async ()  {  return  async; } public  StatefulRedisConnectionImpl (RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec,                                    Duration timeout)  {     super (writer, timeout);      this .pushHandler = pushHandler;   this .codec = codec;   this .async = newRedisAsyncCommandsImpl();   this .sync = newRedisSyncCommandsImpl();   this .reactive = newRedisReactiveCommandsImpl(); } 
 
2.2.1.2、LettuceInvoker的from方法 这里阅读源码的难点在于:@FunctionalInterface函数式接口和lambda表达式。
了解两个前提:
创建redis连接的时候,在io.lettuce.core.RedisClient#connectStandaloneAsync方法中调用了newStatefulRedisConnection方法,最终new了一个StatefulRedisConnectionImpl,注意其中的async属性。 
上个小节中LettuceInvoker中的connection属性就是这个async,即RedisAsyncCommandsImpl对象。 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public  Boolean set (byte [] key, byte [] value)  {     LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = new  LettuceInvoker .ConnectionFunction2<byte [], byte [], String>() {     @Override      public  RedisFuture<String> apply (RedisClusterAsyncCommands<byte [], byte []> connection, byte [] key, byte [] value)  {       return  connection.set(key,value);     }   };      LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = (connection, key1, value1) -> connection.set(key1, value1);      LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = RedisStringAsyncCommands::set;   return  connection.invoke().from(function, key, value)     .get(Converters.stringToBooleanConverter()); } @FunctionalInterface interface  ConnectionFunction2 <T1, T2, R> {     RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection, T1 t1, T2 t2) ; } <R, T1, T2> SingleInvocationSpec<R> from (ConnectionFunction2<T1, T2, R> function, T1 t1, T2 t2)  {   Assert.notNull(function, "ConnectionFunction must not be null" );      ConnectionFunction0<R> function2 = new  ConnectionFunction0 <R>() {     @Override      public  RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection)  {       return  function.apply(connection, t1, t2);     }   };      ConnectionFunction0<R> function2 = connection -> function.apply(connection, t1, t2);   return  from(function2); } @FunctionalInterface interface  ConnectionFunction0 <R> {     RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection) ; } <R> SingleInvocationSpec<R> from (ConnectionFunction0<R> function)  {   Assert.notNull(function, "ConnectionFunction must not be null" );      Supplier<RedisFuture<R>> parent = new  Supplier <RedisFuture<R>>() {     @Override      public  RedisFuture<R> get ()  {       return  function.apply(connection);     }   };      Supplier<RedisFuture<R>> parent = () -> function.apply(connection);   return  new  DefaultSingleInvocationSpec <>(parent, synchronizer); } 
 
从代码中可以看出,Spring的LettuceInvoker对RedisClusterAsyncCommands做了一个编排,组装好DefaultSingleInvocationSpec对象,最后在调用SingleInvocationSpec的get方法。
2.2.1.3、SingleInvocationSpec的get方法 1 2 3 4 5 6 @Override public  <T> T get (Converter<S, T> converter)  {  Assert.notNull(converter, "Converter must not be null" );   return  synchronizer.invoke(parent, converter, () -> null ); } 
 
converter为Converters.stringToBooleanConverter(),将返回的“OK”字符串转Boolean true 
parent为RedisFuture 
 
synchronizer为LettuceInvoker.Synchronizer,在2.2.1.1小节能看到:new LettuceInvoker的方法中,传入了synchronizer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 LettuceInvoker.Synchronizer  synchronizer  =  new  LettuceInvoker .Synchronizer() {   @Override    public  Object doInvoke (Supplier<RedisFuture<Object>> futureSupplier, Converter<Object, Object> converter, Supplier<Object> nullDefault)  {     try  {              Object  result  =  await(futureSupplier.get());              if  (result == null ) {         return  nullDefault.get();       }              return  converter.convert(result);     } catch  (Exception ex) {       throw  convertLettuceAccessException(ex);     }   } }; 	@Nullable  	private  <T> T await (RedisFuture<T> cmd)  { 		if  (isMulti) { 			return  null ; 		} 		try  { 			return  LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); 		} catch  (RuntimeException e) { 			throw  convertLettuceAccessException(e); 		} 	} 
 
通过执行futureSupplier.get(),触发RedisStringAsyncCommands#set(K, V)方法 
再执行await后,等待timeout时间,从RedisFuture获取结果 
 
至此终于到了核心执行redis命令的地方。
2.2.2、RedisStringAsyncCommands的set方法执行流程 2.3、释放RedisConnection 3、Redis 发布订阅 
通过SpringRedisPubSubPolicy注册对redis中key-val过期和删除监听 
SpringRedisPubSubPolicy#publish 发布消息 
SpringRedisMessageListener 订阅消息监听 
 
参考 https://github.com/lettuce-io/lettuce-core/ 
https://github.com/spring-projects/spring-data-redis 
https://www.cnblogs.com/sinoknots/p/15984465.html 
https://blog.csdn.net/qq_38390126/article/details/104256912 
https://www.jianshu.com/p/9531dd70b394 
https://cloud.tencent.com/developer/article/1683498 
https://segmentfault.com/a/1190000041391240 
https://www.jianshu.com/p/ab80bb3b5415 
https://www.jianshu.com/p/52e988455d07 
https://www.jianshu.com/p/fff27097bd81 
https://blog.csdn.net/winterking3/article/details/116477522 
https://blog.csdn.net/winterking3/article/details/116025829?spm=1001.2014.3001.5502 
https://blog.csdn.net/singgel/article/details/105552962 
https://blog.csdn.net/weixin_42260270/article/details/117749453