以插入数据为例,分析下spring通过lettuce操作redis。
1 2 3 4 RedisConnection  connection  =  connectionFactory.getConnection();connection.set("key" .getBytes(StandardCharsets.UTF_8), "123" .getBytes(StandardCharsets.UTF_8)); 
这里的connection为:org.springframework.data.redis.connection.lettuce.LettuceConnection,其实现RedisConnection接口、实现RedisCommands接口,定义了redis的各种操作。
1 2 3 4 5 6 public  interface  RedisConnection  extends  RedisCommands , AutoCloseable {public  interface  RedisCommands  extends  RedisKeyCommands , RedisStringCommands, RedisListCommands, RedisSetCommands,  RedisZSetCommands, RedisHashCommands, RedisTxCommands, RedisPubSubCommands, RedisConnectionCommands,   RedisServerCommands, RedisStreamCommands, RedisScriptingCommands, RedisGeoCommands, RedisHyperLogLogCommands {    @Nullable      Object execute (String command, byte []... args) ; } 
1、LettuceInvoker操作Lettuce 1 2 3 4 5 6 7 @Override public  Boolean set (byte [] key, byte [] value)  {        return  connection.invoke().from(RedisStringAsyncCommands::set, key, value)     .get(Converters.stringToBooleanConverter()); } 
1.1、LettuceInvoker对象的创建 LettuceInvoker类的解释:
用户Lettuce方法的函数调用. 通常用于将方法调用表示为方法引用,并通过just或from方法之一传递方法参数。
just方法记录方法调用并立即评估方法结果。 
from方法允许组成一个pipeline ,使用转换器转换结果。 
 
Usage example:
LettuceInvoker invoker = …;
Long result = invoker.just(RedisGeoAsyncCommands::geoadd, key, point.getX(), point.getY(), member);
List<byte[]> result = invoker.fromMany(RedisGeoAsyncCommands::geohash, key, members).toList(it -> it.getValueOrElse(null));
RedisFuture委托给LettuceInvoker管理。
Synchronizer可以等待完成,也可以记录转换器的future,以便进一步处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 LettuceInvoker invoke ()  {      return  invoke(getAsyncConnection()); } LettuceInvoker invoke (RedisClusterAsyncCommands<byte [], byte []> connection)  {   return  doInvoke(connection, false ); } private  LettuceInvoker doInvoke (RedisClusterAsyncCommands<byte [], byte []> connection, boolean  statusCommand)  {                 return  new  LettuceInvoker (connection, (future, converter, nullDefault) -> {     try  {       Object  result  =  await(future.get());       if  (result == null ) {         return  nullDefault.get();       }       return  converter.convert(result);     } catch  (Exception ex) {       throw  convertLettuceAccessException(ex);     }   }); } 	@Nullable  private  <T> T await (RedisFuture<T> cmd)  {		if  (isMulti) { 			return  null ; 		} 		try  { 			return  LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); 		} catch  (RuntimeException e) { 			throw  convertLettuceAccessException(e); 		} 	} 
getAsyncConnection获取异步连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 RedisClusterAsyncCommands<byte [], byte []> getAsyncConnection() {   if  (isQueueing() || isPipelined()) {     return  getAsyncDedicatedConnection();   }   if  (asyncSharedConn != null ) {     if  (asyncSharedConn instanceof  StatefulRedisConnection) {       return  ((StatefulRedisConnection<byte [], byte []>) asyncSharedConn).async();     }     if  (asyncSharedConn instanceof  StatefulRedisClusterConnection) {       return  ((StatefulRedisClusterConnection<byte [], byte []>) asyncSharedConn).async();     }   }   return  getAsyncDedicatedConnection(); } 
在分析获取RedisConnection的时候,了解到StatefulRedisConnectionImpl中维护了RedisAsyncCommandsImpl的实现对象。这里异步获取连接的async属性就是RedisAsyncCommandsImpl对象。
备注:async对象设置与获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public  RedisAsyncCommands<K, V> async ()  {  return  async; } public  StatefulRedisConnectionImpl (RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec,                                    Duration timeout)  {     super (writer, timeout);      this .pushHandler = pushHandler;   this .codec = codec;      this .async = newRedisAsyncCommandsImpl();   this .sync = newRedisSyncCommandsImpl();   this .reactive = newRedisReactiveCommandsImpl(); } 
1.2、LettuceInvoker的from方法 这里阅读源码的难点在于:@FunctionalInterface函数式接口和lambda表达式。
首先需要了解两个前提:
创建redis连接的时候,在io.lettuce.core.RedisClient#connectStandaloneAsync方法中调用了newStatefulRedisConnection方法,最终new了一个StatefulRedisConnectionImpl对象,注意其中的async属性。 
LettuceInvoker的connection属性就是这个async,即RedisAsyncCommandsImpl对象。 
 
下面的源码,通过匿名内部类和lambda表达式,表达多种创建同一个对象的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public  Boolean set (byte [] key, byte [] value)  {     LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = new  LettuceInvoker .ConnectionFunction2<byte [], byte [], String>() {     @Override      public  RedisFuture<String> apply (RedisClusterAsyncCommands<byte [], byte []> connection, byte [] key, byte [] value)  {       return  connection.set(key,value);     }   };      LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = (connection, key1, value1) -> connection.set(key1, value1);      LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = RedisStringAsyncCommands::set;   return  connection.invoke().from(function, key, value)     .get(Converters.stringToBooleanConverter()); } @FunctionalInterface interface  ConnectionFunction2 <T1, T2, R> {     RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection, T1 t1, T2 t2) ; } <R, T1, T2> SingleInvocationSpec<R> from (ConnectionFunction2<T1, T2, R> function, T1 t1, T2 t2)  {   Assert.notNull(function, "ConnectionFunction must not be null" );      ConnectionFunction0<R> function2 = new  ConnectionFunction0 <R>() {     @Override      public  RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection)  {       return  function.apply(connection, t1, t2);     }   };      ConnectionFunction0<R> function2 = connection -> function.apply(connection, t1, t2);   return  from(function2); } @FunctionalInterface interface  ConnectionFunction0 <R> {     RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection) ; } <R> SingleInvocationSpec<R> from (ConnectionFunction0<R> function)  {   Assert.notNull(function, "ConnectionFunction must not be null" );      Supplier<RedisFuture<R>> parent = new  Supplier <RedisFuture<R>>() {     @Override      public  RedisFuture<R> get ()  {       return  function.apply(connection);     }   };      Supplier<RedisFuture<R>> parent = () -> function.apply(connection);   return  new  DefaultSingleInvocationSpec <>(parent, synchronizer); } 
从代码中可以看出,Spring的LettuceInvoker对RedisClusterAsyncCommands做了一个编排,组装好DefaultSingleInvocationSpec对象,最后在调用SingleInvocationSpec的get方法。
1.3、SingleInvocationSpec的get方法 1 2 3 4 5 6 @Override public  <T> T get (Converter<S, T> converter)  {  Assert.notNull(converter, "Converter must not be null" );   return  synchronizer.invoke(parent, converter, () -> null ); } 
测试时,这里的converter为Converters.stringToBooleanConverter():将返回的“OK”字符串转Boolean true 
parent为RedisFuture 
 
synchronizer为LettuceInvoker.Synchronizer,在创建LettuceInvoker对象的过程中,可以看到传入的synchronizer。
换一种方式看LettuceInvoker.Synchronizer对象创建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 LettuceInvoker.Synchronizer  synchronizer  =  new  LettuceInvoker .Synchronizer() {   @Override    public  Object doInvoke (Supplier<RedisFuture<Object>> futureSupplier, Converter<Object, Object> converter, Supplier<Object> nullDefault)  {     try  {              Object  result  =  await(futureSupplier.get());       if  (result == null ) {         return  nullDefault.get();       }       return  converter.convert(result);     } catch  (Exception ex) {       throw  convertLettuceAccessException(ex);     }   } }; @Nullable private  <T> T await (RedisFuture<T> cmd)  {  if  (isMulti) {     return  null ;   }   try  {     return  LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS);   } catch  (RuntimeException e) {     throw  convertLettuceAccessException(e);   } } 
通过执行futureSupplier.get(),通过LettuceInvoker层层编排,最终触发RedisAsyncCommandsImpl#set(K, V)方法。源码看起来比较费劲,通过debug一步步操作,能清晰一点点。 
再执行await后,等待timeout时间,从RedisFuture获取结果。 
 
至此终于到了核心执行redis命令的地方。
2、RedisStringAsyncCommands的set方法执行流程 RedisAsyncCommandsImpl实现了AbstractRedisAsyncCommands类,set方法核心实现都在抽象类中:
1 2 3 4 5 6 7 8 9 private  final  RedisCommandBuilder<K, V> commandBuilder;   @Override public  RedisFuture<String> set (K key, V value)  {        return  dispatch(commandBuilder.set(key, value)); } 
2.1、Redis命令组装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Command<K, V, String> set (K key, V value)  {         return  createCommand(SET, new  StatusOutput <>(codec), key, value); } protected  <T> Command<K, V, T> createCommand (CommandType type, CommandOutput<K, V, T> output, K key, V value)  {     CommandArgs<K, V> args = new  CommandArgs <>(codec).addKey(key).addValue(value);   return  createCommand(type, output, args); }  protected  <T> Command<K, V, T> createCommand (CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args)  {  return  new  Command <>(type, output, args); } public  class  Command <K, V, T> implements  RedisCommand <K, V, T> {     private  final  ProtocolKeyword type;      protected  CommandArgs<K, V> args;      protected  CommandOutput<K, V, T> output; } 
2.2、Redis命令发送 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public  <T> AsyncCommand<K, V, T> dispatch (RedisCommand<K, V, T> cmd)  {     AsyncCommand<K, V, T> asyncCommand = new  AsyncCommand <>(cmd);         RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand);      if  (dispatched instanceof  AsyncCommand) {     return  (AsyncCommand<K, V, T>) dispatched;   }   return  asyncCommand; } @Override public  <T> RedisCommand<K, V, T> dispatch (RedisCommand<K, V, T> command)  {           RedisCommand<K, V, T> toSend = preProcessCommand(command);      potentiallyEnableMulti(command);      return  super .dispatch(toSend); } private  final  RedisChannelWriter channelWriter;protected  <T> RedisCommand<K, V, T> dispatch (RedisCommand<K, V, T> cmd)  {  if  (debugEnabled) {     logger.debug("dispatching command {}" , cmd);   }      if  (tracingEnabled) {     RedisCommand<K, V, T> commandToSend = cmd;     TraceContextProvider  provider  =  CommandWrapper.unwrap(cmd, TraceContextProvider.class);     if  (provider == null ) {       commandToSend = new  TracedCommand <>(cmd,clientResources.tracing().initialTraceContextProvider().getTraceContext());     }     return  channelWriter.write(commandToSend);   }      return  channelWriter.write(cmd); } 
首先看一下channelWriter是啥,通过RedisChannelHandler的构造函数向上查看,回到了获取Redis连接的io.lettuce.core.RedisClient#connectStandaloneAsync方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private  <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync (RedisCodec<K, V> codec,                                                                                       RedisURI redisURI, Duration timeout)  {  assertNotNull(codec);   checkValidRedisURI(redisURI);      logger.debug("Trying to get a Redis connection for: {}" , redisURI);            DefaultEndpoint  endpoint  =  new  DefaultEndpoint (getOptions(), getResources());   RedisChannelWriter  writer  =  endpoint;         if  (CommandExpiryWriter.isSupported(getOptions())) {     writer = new  CommandExpiryWriter (writer, getOptions(), getResources());   }         if  (CommandListenerWriter.isSupported(getCommandListeners())) {     writer = new  CommandListenerWriter (writer, getCommandListeners());   }                  StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);         ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,                                                                                 () -> new  CommandHandler (getOptions(), getResources(), endpoint));         future.whenComplete((channelHandler, throwable) -> {               if  (throwable != null ) {              connection.closeAsync();     }   });      return  future; } 
从源码中可以看到:channelWriter就是DefaultEndpoint对象,也可能是通过装饰器模式增加一些支持过期时间等功能。下面看channelWriter的write方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 @Override public  <K, V, T> RedisCommand<K, V, T> write (RedisCommand<K, V, T> command)  {     LettuceAssert.notNull(command, "Command must not be null" );               RedisException  validation  =  validateWrite(1 );   if  (validation != null ) {     command.completeExceptionally(validation);     return  command;   }      try  {                              sharedLock.incrementWriters();               if  (inActivation) {       command = processActivationCommand(command);     }                    if  (autoFlushCommands) {              if  (isConnected()) {                  writeToChannelAndFlush(command);       } else  {                           writeToDisconnectedBuffer(command);       }            } else  {              writeToBuffer(command);     }   } finally  {          sharedLock.decrementWriters();     if  (debugEnabled) {       logger.debug("{} write() done" , logPrefix());     }   }      return  command; } private  static  final  AtomicIntegerFieldUpdater<DefaultEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater.newUpdater(DefaultEndpoint.class, "queueSize" ); @SuppressWarnings("unused") private  volatile  int  queueSize  =  0 ;private  void  writeToChannelAndFlush (RedisCommand<?, ?, ?> command)  {     QUEUE_SIZE.incrementAndGet(this );      ChannelFuture  channelFuture  =  channelWriteAndFlush(command);         if  (reliability == Reliability.AT_MOST_ONCE) {          channelFuture.addListener(AtMostOnceWriteListener.newInstance(this , command));   }      if  (reliability == Reliability.AT_LEAST_ONCE) {               channelFuture.addListener(RetryListener.newInstance(this , command));   } } 
使用了AtomicIntegerFieldUpdater,原子性地更新DefaultEndpoint对象的int类型的queueSize属性,该属性无须被声明成AtomicInteger。 
 
1 2 3 4 5 6 7 8 9 protected  volatile  Channel channel;private  ChannelFuture channelWriteAndFlush (RedisCommand<?, ?, ?> command)  {     if  (debugEnabled) {     logger.debug("{} write() writeAndFlush command {}" , logPrefix(), command);   }      return  channel.writeAndFlush(command); } 
至此,redis命令写入到了Netty的Channel中,channel会把需要redis命令放入Channel对应的EventLoop的队列中,流程就结束了,EventLoop是一个SingleThreadEventExector,它会回调创建Netty的Bootstrap时配置的CommandHandler的write方法,CommandHandler继承io.netty.channel.ChannelDuplexHandler。代码中的channel创建过程、Channel与EventLoop的关系,Netty处理流程等,需要等学习Netty的时候再分析。
2.3、CommandHandler写命令到Redis 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Override @SuppressWarnings("unchecked") public  void  write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise)  throws  Exception {  if  (debugEnabled) {     logger.debug("{} write(ctx, {}, promise)" , logPrefix(), msg);   }      if  (msg instanceof  RedisCommand) {     writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise);     return ;   }      if  (msg instanceof  List) {     List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg;     if  (batch.size() == 1 ) {          writeSingleCommand(ctx, batch.get(0 ), promise);       return ;     }     writeBatch(ctx, batch, promise);     return ;   }   if  (msg instanceof  Collection) {     writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise);   } } private  void  writeSingleCommand (ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise)  {     if  (!isWriteable(command)) {     promise.trySuccess();     return ;   }      addToStack(command, promise);      attachTracing(ctx, command);         ctx.write(command, promise); } private  final  ArrayDeque<RedisCommand<?, ?, ?>> stack = new  ArrayDeque <>();   private  void  addToStack (RedisCommand<?, ?, ?> command, ChannelPromise promise)  {     try  {          validateWrite(1 );          if  (command.getOutput() == null ) {              complete(command);     }          RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);               if  (promise.isVoid()) {              stack.add(redisCommand);     } else  {              promise.addListener(AddToStack.newInstance(stack, redisCommand));     }   } catch  (Exception e) {     command.completeExceptionally(e);     throw  e;   } } 
到了这一步,redis命令才真正的发送到Redis中。
2.4、CommandHandler从Redis获取命令执行结果 Netty在收到Redis服务端返回的消息之后就会回调CommandHandler的channelRead方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private  ByteBuf buffer;@Override public  void  channelRead (ChannelHandlerContext ctx, Object msg)  throws  Exception {     ByteBuf  input  =  (ByteBuf) msg;      input.touch("CommandHandler.read(…)" );      if  (!input.isReadable() || input.refCnt() == 0 ) {     logger.warn("{} Input not readable {}, {}" , logPrefix(), input.isReadable(), input.refCnt());     return ;   }   if  (debugEnabled) {     logger.debug("{} Received: {} bytes, {} commands in the stack" , logPrefix(), input.readableBytes(), stack.size());   }      try  {     if  (buffer.refCnt() < 1 ) {       logger.warn("{} Ignoring received data for closed or abandoned connection" , logPrefix());       return ;     }     if  (debugEnabled && ctx.channel() != channel) {       logger.debug("{} Ignoring data for a non-registered channel {}" , logPrefix(), ctx.channel());       return ;     }     if  (traceEnabled) {       logger.trace("{} Buffer: {}" , logPrefix(), input.toString(Charset.defaultCharset()).trim());     }     buffer.touch("CommandHandler.read(…)" );     buffer.writeBytes(input);          decode(ctx, buffer);   } finally  {     input.release();   } } private  final  ArrayDeque<RedisCommand<?, ?, ?>> stack = new  ArrayDeque <>();protected  void  decode (ChannelHandlerContext ctx, ByteBuf buffer)  throws  InterruptedException {           RedisCommand<?, ?, ?> command = stack.peek();      try  {          if  (!decode(ctx, buffer, command)) {       hasDecodeProgress = true ;       decodeBufferPolicy.afterPartialDecode(buffer);       return ;     }   } catch  (Exception e) {          ctx.close();     throw  e;   } } 
解密完毕后,会调用command.complete()方法,即:io.lettuce.core.protocol.AsyncCommand#complete。
核心逻辑就是将从redis中获取的结果塞到CompletableFuture中,便于1.3小节中LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS)获取结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 private  final  ArrayDeque<RedisCommand<?, ?, ?>> stack = new  ArrayDeque <>();protected  void  decode (ChannelHandlerContext ctx, ByteBuf buffer)  throws  InterruptedException {           RedisCommand<?, ?, ?> command = stack.peek();      try  {          if  (!decode(ctx, buffer, command)) {       hasDecodeProgress = true ;       decodeBufferPolicy.afterPartialDecode(buffer);       return ;     }   } catch  (Exception e) {          ctx.close();     throw  e;   } } private  RedisStateMachine rsm;protected  boolean  decode (ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output)  {        return  rsm.decode(buffer, output, command::completeExceptionally); } public  boolean  decode (ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler)  {  buffer.touch("RedisStateMachine.decode(…)" );   if  (isEmpty(stack)) {     add(stack, new  State ());   }   if  (output == null ) {     return  isEmpty(stack);   }      boolean  resp3Indicator  =  doDecode(buffer, output, errorHandler);   if  (debugEnabled) {     logger.debug("Decode done, empty stack: {}" , isEmpty(stack));   }   if  (isDiscoverProtocol()) {     if  (resp3Indicator) {       setProtocolVersion(ProtocolVersion.RESP3);     } else  {       setProtocolVersion(ProtocolVersion.RESP2);     }   }   return  isEmpty(stack); } private  boolean  doDecode (ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler)  {     boolean  resp3Indicator  =  false ;      State.Result result;      while  (!isEmpty(stack)) {     State  state  =  peek(stack);          if  (state.type == null ) {       if  (!buffer.isReadable()) {         break ;       }       state.type = readReplyType(buffer);              if  (state.type == HELLO_V3 || state.type == MAP) {         resp3Indicator = true ;       }              buffer.markReaderIndex();     }          result = state.type.handle(this , state, buffer, output, errorHandler);     if  (State.Result.BREAK_LOOP.equals(result)) {       break ;     } else  if  (State.Result.CONTINUE_LOOP.equals(result)) {       continue ;     }     buffer.markReaderIndex();     remove(stack);          output.complete(size(stack));   }      return  resp3Indicator; } static  State.Result handleSingle (RedisStateMachine rsm, State state, ByteBuf buffer, CommandOutput<?, ?, ?> output,                                  Consumer<Exception> errorHandler)  {  ByteBuffer bytes;      if  ((bytes = rsm.readLine(buffer)) == null ) {     return  State.Result.BREAK_LOOP;   }      if  (!QUEUED.equals(bytes)) {     rsm.safeSetSingle(output, bytes, errorHandler);   }   return  State.Result.NORMAL_END; } protected  void  safeSetSingle (CommandOutput<?, ?, ?> output, ByteBuffer bytes, Consumer<Exception> errorHandler)  {     try  {     output.set(bytes);   } catch  (Exception e) {     errorHandler.accept(e);   } } public  class  StatusOutput <K, V> extends  CommandOutput <K, V, String> {     private  static  final  ByteBuffer  OK  =  StandardCharsets.US_ASCII.encode("OK" );      public  StatusOutput (RedisCodec<K, V> codec)  {     super (codec, null );   }      @Override    public  void  set (ByteBuffer bytes)  {     output = OK.equals(bytes) ? "OK"  : decodeAscii(bytes);   }    } 
简单总结一下就是:获取到redis的相应后,解码,最终写入到“redis命令”对象中的output属性中。
2.5、Lettuce怎么保证发送命令和返回结果一一对应? 
Lettuce与Redis之间只有一条tcp连接 
Lettuce将redis命令放入stack时是有序(代码中主要使用ArrayDeque<RedisCommand, ?, ?>> 数组双端队列来实现功能) 
tcp协议本身是有序 
redis是单线程处理请求的,所以Redis返回的消息也是有序的 
Lettuce的断线重连操作ConnectionWatchdog 
 
这样就能保证Redis中返回的消息一定对应着stack中的第一个RedisCommand。[
](https://blog.csdn.net/weixin_45145848/article/details/103710855 )
参考 Netty详解之九:ByteBuf介绍_longhuihu的博客-CSDN博客_bytebuf 
Netty详解之九:使用ByteBuf_longhuihu的博客-CSDN博客_bytebuf retain 
Redis客户端Lettuce源码【四】Lettuce是如何断线重连的_杨_同_学的博客-CSDN博客_lettuce 自动重连