以插入数据为例,分析下spring通过lettuce操作redis。
1 2 3 4 RedisConnection connection = connectionFactory.getConnection();connection.set("key" .getBytes(StandardCharsets.UTF_8), "123" .getBytes(StandardCharsets.UTF_8));
这里的connection为:org.springframework.data.redis.connection.lettuce.LettuceConnection,其实现RedisConnection接口、实现RedisCommands接口,定义了redis的各种操作。
1 2 3 4 5 6 public interface RedisConnection extends RedisCommands , AutoCloseable {public interface RedisCommands extends RedisKeyCommands , RedisStringCommands, RedisListCommands, RedisSetCommands, RedisZSetCommands, RedisHashCommands, RedisTxCommands, RedisPubSubCommands, RedisConnectionCommands, RedisServerCommands, RedisStreamCommands, RedisScriptingCommands, RedisGeoCommands, RedisHyperLogLogCommands { @Nullable Object execute (String command, byte []... args) ; }
1、LettuceInvoker操作Lettuce 1 2 3 4 5 6 7 @Override public Boolean set (byte [] key, byte [] value) { return connection.invoke().from(RedisStringAsyncCommands::set, key, value) .get(Converters.stringToBooleanConverter()); }
1.1、LettuceInvoker对象的创建 LettuceInvoker类的解释:
用户Lettuce方法的函数调用. 通常用于将方法调用表示为方法引用,并通过just或from方法之一传递方法参数。
just方法记录方法调用并立即评估方法结果。
from方法允许组成一个pipeline ,使用转换器转换结果。
Usage example:
LettuceInvoker invoker = …;
Long result = invoker.just(RedisGeoAsyncCommands::geoadd, key, point.getX(), point.getY(), member);
List<byte[]> result = invoker.fromMany(RedisGeoAsyncCommands::geohash, key, members).toList(it -> it.getValueOrElse(null));
RedisFuture委托给LettuceInvoker管理。
Synchronizer可以等待完成,也可以记录转换器的future,以便进一步处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 LettuceInvoker invoke () { return invoke(getAsyncConnection()); } LettuceInvoker invoke (RedisClusterAsyncCommands<byte [], byte []> connection) { return doInvoke(connection, false ); } private LettuceInvoker doInvoke (RedisClusterAsyncCommands<byte [], byte []> connection, boolean statusCommand) { return new LettuceInvoker (connection, (future, converter, nullDefault) -> { try { Object result = await(future.get()); if (result == null ) { return nullDefault.get(); } return converter.convert(result); } catch (Exception ex) { throw convertLettuceAccessException(ex); } }); } @Nullable private <T> T await (RedisFuture<T> cmd) { if (isMulti) { return null ; } try { return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); } catch (RuntimeException e) { throw convertLettuceAccessException(e); } }
getAsyncConnection获取异步连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 RedisClusterAsyncCommands<byte [], byte []> getAsyncConnection() { if (isQueueing() || isPipelined()) { return getAsyncDedicatedConnection(); } if (asyncSharedConn != null ) { if (asyncSharedConn instanceof StatefulRedisConnection) { return ((StatefulRedisConnection<byte [], byte []>) asyncSharedConn).async(); } if (asyncSharedConn instanceof StatefulRedisClusterConnection) { return ((StatefulRedisClusterConnection<byte [], byte []>) asyncSharedConn).async(); } } return getAsyncDedicatedConnection(); }
在分析获取RedisConnection的时候,了解到StatefulRedisConnectionImpl中维护了RedisAsyncCommandsImpl的实现对象。这里异步获取连接的async属性就是RedisAsyncCommandsImpl对象。
备注:async对象设置与获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public RedisAsyncCommands<K, V> async () { return async; } public StatefulRedisConnectionImpl (RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout) { super (writer, timeout); this .pushHandler = pushHandler; this .codec = codec; this .async = newRedisAsyncCommandsImpl(); this .sync = newRedisSyncCommandsImpl(); this .reactive = newRedisReactiveCommandsImpl(); }
1.2、LettuceInvoker的from方法 这里阅读源码的难点在于:@FunctionalInterface函数式接口和lambda表达式。
首先需要了解两个前提:
创建redis连接的时候,在io.lettuce.core.RedisClient#connectStandaloneAsync方法中调用了newStatefulRedisConnection方法,最终new了一个StatefulRedisConnectionImpl对象,注意其中的async属性。
LettuceInvoker的connection属性就是这个async,即RedisAsyncCommandsImpl对象。
下面的源码,通过匿名内部类和lambda表达式,表达多种创建同一个对象的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public Boolean set (byte [] key, byte [] value) { LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = new LettuceInvoker .ConnectionFunction2<byte [], byte [], String>() { @Override public RedisFuture<String> apply (RedisClusterAsyncCommands<byte [], byte []> connection, byte [] key, byte [] value) { return connection.set(key,value); } }; LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = (connection, key1, value1) -> connection.set(key1, value1); LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = RedisStringAsyncCommands::set; return connection.invoke().from(function, key, value) .get(Converters.stringToBooleanConverter()); } @FunctionalInterface interface ConnectionFunction2 <T1, T2, R> { RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection, T1 t1, T2 t2) ; } <R, T1, T2> SingleInvocationSpec<R> from (ConnectionFunction2<T1, T2, R> function, T1 t1, T2 t2) { Assert.notNull(function, "ConnectionFunction must not be null" ); ConnectionFunction0<R> function2 = new ConnectionFunction0 <R>() { @Override public RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection) { return function.apply(connection, t1, t2); } }; ConnectionFunction0<R> function2 = connection -> function.apply(connection, t1, t2); return from(function2); } @FunctionalInterface interface ConnectionFunction0 <R> { RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection) ; } <R> SingleInvocationSpec<R> from (ConnectionFunction0<R> function) { Assert.notNull(function, "ConnectionFunction must not be null" ); Supplier<RedisFuture<R>> parent = new Supplier <RedisFuture<R>>() { @Override public RedisFuture<R> get () { return function.apply(connection); } }; Supplier<RedisFuture<R>> parent = () -> function.apply(connection); return new DefaultSingleInvocationSpec <>(parent, synchronizer); }
从代码中可以看出,Spring的LettuceInvoker对RedisClusterAsyncCommands做了一个编排,组装好DefaultSingleInvocationSpec对象,最后在调用SingleInvocationSpec的get方法。
1.3、SingleInvocationSpec的get方法 1 2 3 4 5 6 @Override public <T> T get (Converter<S, T> converter) { Assert.notNull(converter, "Converter must not be null" ); return synchronizer.invoke(parent, converter, () -> null ); }
测试时,这里的converter为Converters.stringToBooleanConverter():将返回的“OK”字符串转Boolean true
parent为RedisFuture
synchronizer为LettuceInvoker.Synchronizer,在创建LettuceInvoker对象的过程中,可以看到传入的synchronizer。
换一种方式看LettuceInvoker.Synchronizer对象创建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 LettuceInvoker.Synchronizer synchronizer = new LettuceInvoker .Synchronizer() { @Override public Object doInvoke (Supplier<RedisFuture<Object>> futureSupplier, Converter<Object, Object> converter, Supplier<Object> nullDefault) { try { Object result = await(futureSupplier.get()); if (result == null ) { return nullDefault.get(); } return converter.convert(result); } catch (Exception ex) { throw convertLettuceAccessException(ex); } } }; @Nullable private <T> T await (RedisFuture<T> cmd) { if (isMulti) { return null ; } try { return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); } catch (RuntimeException e) { throw convertLettuceAccessException(e); } }
通过执行futureSupplier.get(),通过LettuceInvoker层层编排,最终触发RedisAsyncCommandsImpl#set(K, V)方法。源码看起来比较费劲,通过debug一步步操作,能清晰一点点。
再执行await后,等待timeout时间,从RedisFuture获取结果。
至此终于到了核心执行redis命令的地方。
2、RedisStringAsyncCommands的set方法执行流程 RedisAsyncCommandsImpl实现了AbstractRedisAsyncCommands类,set方法核心实现都在抽象类中:
1 2 3 4 5 6 7 8 9 private final RedisCommandBuilder<K, V> commandBuilder; @Override public RedisFuture<String> set (K key, V value) { return dispatch(commandBuilder.set(key, value)); }
2.1、Redis命令组装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Command<K, V, String> set (K key, V value) { return createCommand(SET, new StatusOutput <>(codec), key, value); } protected <T> Command<K, V, T> createCommand (CommandType type, CommandOutput<K, V, T> output, K key, V value) { CommandArgs<K, V> args = new CommandArgs <>(codec).addKey(key).addValue(value); return createCommand(type, output, args); } protected <T> Command<K, V, T> createCommand (CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) { return new Command <>(type, output, args); } public class Command <K, V, T> implements RedisCommand <K, V, T> { private final ProtocolKeyword type; protected CommandArgs<K, V> args; protected CommandOutput<K, V, T> output; }
2.2、Redis命令发送 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public <T> AsyncCommand<K, V, T> dispatch (RedisCommand<K, V, T> cmd) { AsyncCommand<K, V, T> asyncCommand = new AsyncCommand <>(cmd); RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand); if (dispatched instanceof AsyncCommand) { return (AsyncCommand<K, V, T>) dispatched; } return asyncCommand; } @Override public <T> RedisCommand<K, V, T> dispatch (RedisCommand<K, V, T> command) { RedisCommand<K, V, T> toSend = preProcessCommand(command); potentiallyEnableMulti(command); return super .dispatch(toSend); } private final RedisChannelWriter channelWriter;protected <T> RedisCommand<K, V, T> dispatch (RedisCommand<K, V, T> cmd) { if (debugEnabled) { logger.debug("dispatching command {}" , cmd); } if (tracingEnabled) { RedisCommand<K, V, T> commandToSend = cmd; TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class); if (provider == null ) { commandToSend = new TracedCommand <>(cmd,clientResources.tracing().initialTraceContextProvider().getTraceContext()); } return channelWriter.write(commandToSend); } return channelWriter.write(cmd); }
首先看一下channelWriter是啥,通过RedisChannelHandler的构造函数向上查看,回到了获取Redis连接的io.lettuce.core.RedisClient#connectStandaloneAsync方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync (RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { assertNotNull(codec); checkValidRedisURI(redisURI); logger.debug("Trying to get a Redis connection for: {}" , redisURI); DefaultEndpoint endpoint = new DefaultEndpoint (getOptions(), getResources()); RedisChannelWriter writer = endpoint; if (CommandExpiryWriter.isSupported(getOptions())) { writer = new CommandExpiryWriter (writer, getOptions(), getResources()); } if (CommandListenerWriter.isSupported(getCommandListeners())) { writer = new CommandListenerWriter (writer, getCommandListeners()); } StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout); ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler (getOptions(), getResources(), endpoint)); future.whenComplete((channelHandler, throwable) -> { if (throwable != null ) { connection.closeAsync(); } }); return future; }
从源码中可以看到:channelWriter就是DefaultEndpoint对象,也可能是通过装饰器模式增加一些支持过期时间等功能。下面看channelWriter的write方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 @Override public <K, V, T> RedisCommand<K, V, T> write (RedisCommand<K, V, T> command) { LettuceAssert.notNull(command, "Command must not be null" ); RedisException validation = validateWrite(1 ); if (validation != null ) { command.completeExceptionally(validation); return command; } try { sharedLock.incrementWriters(); if (inActivation) { command = processActivationCommand(command); } if (autoFlushCommands) { if (isConnected()) { writeToChannelAndFlush(command); } else { writeToDisconnectedBuffer(command); } } else { writeToBuffer(command); } } finally { sharedLock.decrementWriters(); if (debugEnabled) { logger.debug("{} write() done" , logPrefix()); } } return command; } private static final AtomicIntegerFieldUpdater<DefaultEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater.newUpdater(DefaultEndpoint.class, "queueSize" ); @SuppressWarnings("unused") private volatile int queueSize = 0 ;private void writeToChannelAndFlush (RedisCommand<?, ?, ?> command) { QUEUE_SIZE.incrementAndGet(this ); ChannelFuture channelFuture = channelWriteAndFlush(command); if (reliability == Reliability.AT_MOST_ONCE) { channelFuture.addListener(AtMostOnceWriteListener.newInstance(this , command)); } if (reliability == Reliability.AT_LEAST_ONCE) { channelFuture.addListener(RetryListener.newInstance(this , command)); } }
使用了AtomicIntegerFieldUpdater,原子性地更新DefaultEndpoint对象的int类型的queueSize属性,该属性无须被声明成AtomicInteger。
1 2 3 4 5 6 7 8 9 protected volatile Channel channel;private ChannelFuture channelWriteAndFlush (RedisCommand<?, ?, ?> command) { if (debugEnabled) { logger.debug("{} write() writeAndFlush command {}" , logPrefix(), command); } return channel.writeAndFlush(command); }
至此,redis命令写入到了Netty的Channel中,channel会把需要redis命令放入Channel对应的EventLoop的队列中,流程就结束了,EventLoop是一个SingleThreadEventExector,它会回调创建Netty的Bootstrap时配置的CommandHandler的write方法,CommandHandler继承io.netty.channel.ChannelDuplexHandler。代码中的channel创建过程、Channel与EventLoop的关系,Netty处理流程等,需要等学习Netty的时候再分析。
2.3、CommandHandler写命令到Redis 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Override @SuppressWarnings("unchecked") public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (debugEnabled) { logger.debug("{} write(ctx, {}, promise)" , logPrefix(), msg); } if (msg instanceof RedisCommand) { writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise); return ; } if (msg instanceof List) { List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg; if (batch.size() == 1 ) { writeSingleCommand(ctx, batch.get(0 ), promise); return ; } writeBatch(ctx, batch, promise); return ; } if (msg instanceof Collection) { writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise); } } private void writeSingleCommand (ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) { if (!isWriteable(command)) { promise.trySuccess(); return ; } addToStack(command, promise); attachTracing(ctx, command); ctx.write(command, promise); } private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque <>(); private void addToStack (RedisCommand<?, ?, ?> command, ChannelPromise promise) { try { validateWrite(1 ); if (command.getOutput() == null ) { complete(command); } RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command); if (promise.isVoid()) { stack.add(redisCommand); } else { promise.addListener(AddToStack.newInstance(stack, redisCommand)); } } catch (Exception e) { command.completeExceptionally(e); throw e; } }
到了这一步,redis命令才真正的发送到Redis中。
2.4、CommandHandler从Redis获取命令执行结果 Netty在收到Redis服务端返回的消息之后就会回调CommandHandler的channelRead方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private ByteBuf buffer;@Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf input = (ByteBuf) msg; input.touch("CommandHandler.read(…)" ); if (!input.isReadable() || input.refCnt() == 0 ) { logger.warn("{} Input not readable {}, {}" , logPrefix(), input.isReadable(), input.refCnt()); return ; } if (debugEnabled) { logger.debug("{} Received: {} bytes, {} commands in the stack" , logPrefix(), input.readableBytes(), stack.size()); } try { if (buffer.refCnt() < 1 ) { logger.warn("{} Ignoring received data for closed or abandoned connection" , logPrefix()); return ; } if (debugEnabled && ctx.channel() != channel) { logger.debug("{} Ignoring data for a non-registered channel {}" , logPrefix(), ctx.channel()); return ; } if (traceEnabled) { logger.trace("{} Buffer: {}" , logPrefix(), input.toString(Charset.defaultCharset()).trim()); } buffer.touch("CommandHandler.read(…)" ); buffer.writeBytes(input); decode(ctx, buffer); } finally { input.release(); } } private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque <>();protected void decode (ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException { RedisCommand<?, ?, ?> command = stack.peek(); try { if (!decode(ctx, buffer, command)) { hasDecodeProgress = true ; decodeBufferPolicy.afterPartialDecode(buffer); return ; } } catch (Exception e) { ctx.close(); throw e; } }
解密完毕后,会调用command.complete()方法,即:io.lettuce.core.protocol.AsyncCommand#complete。
核心逻辑就是将从redis中获取的结果塞到CompletableFuture中,便于1.3小节中LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS)获取结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque <>();protected void decode (ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException { RedisCommand<?, ?, ?> command = stack.peek(); try { if (!decode(ctx, buffer, command)) { hasDecodeProgress = true ; decodeBufferPolicy.afterPartialDecode(buffer); return ; } } catch (Exception e) { ctx.close(); throw e; } } private RedisStateMachine rsm;protected boolean decode (ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) { return rsm.decode(buffer, output, command::completeExceptionally); } public boolean decode (ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler) { buffer.touch("RedisStateMachine.decode(…)" ); if (isEmpty(stack)) { add(stack, new State ()); } if (output == null ) { return isEmpty(stack); } boolean resp3Indicator = doDecode(buffer, output, errorHandler); if (debugEnabled) { logger.debug("Decode done, empty stack: {}" , isEmpty(stack)); } if (isDiscoverProtocol()) { if (resp3Indicator) { setProtocolVersion(ProtocolVersion.RESP3); } else { setProtocolVersion(ProtocolVersion.RESP2); } } return isEmpty(stack); } private boolean doDecode (ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler) { boolean resp3Indicator = false ; State.Result result; while (!isEmpty(stack)) { State state = peek(stack); if (state.type == null ) { if (!buffer.isReadable()) { break ; } state.type = readReplyType(buffer); if (state.type == HELLO_V3 || state.type == MAP) { resp3Indicator = true ; } buffer.markReaderIndex(); } result = state.type.handle(this , state, buffer, output, errorHandler); if (State.Result.BREAK_LOOP.equals(result)) { break ; } else if (State.Result.CONTINUE_LOOP.equals(result)) { continue ; } buffer.markReaderIndex(); remove(stack); output.complete(size(stack)); } return resp3Indicator; } static State.Result handleSingle (RedisStateMachine rsm, State state, ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler) { ByteBuffer bytes; if ((bytes = rsm.readLine(buffer)) == null ) { return State.Result.BREAK_LOOP; } if (!QUEUED.equals(bytes)) { rsm.safeSetSingle(output, bytes, errorHandler); } return State.Result.NORMAL_END; } protected void safeSetSingle (CommandOutput<?, ?, ?> output, ByteBuffer bytes, Consumer<Exception> errorHandler) { try { output.set(bytes); } catch (Exception e) { errorHandler.accept(e); } } public class StatusOutput <K, V> extends CommandOutput <K, V, String> { private static final ByteBuffer OK = StandardCharsets.US_ASCII.encode("OK" ); public StatusOutput (RedisCodec<K, V> codec) { super (codec, null ); } @Override public void set (ByteBuffer bytes) { output = OK.equals(bytes) ? "OK" : decodeAscii(bytes); } }
简单总结一下就是:获取到redis的相应后,解码,最终写入到“redis命令”对象中的output属性中。
2.5、Lettuce怎么保证发送命令和返回结果一一对应?
Lettuce与Redis之间只有一条tcp连接
Lettuce将redis命令放入stack时是有序(代码中主要使用ArrayDeque<RedisCommand, ?, ?>> 数组双端队列来实现功能)
tcp协议本身是有序
redis是单线程处理请求的,所以Redis返回的消息也是有序的
Lettuce的断线重连操作ConnectionWatchdog
这样就能保证Redis中返回的消息一定对应着stack中的第一个RedisCommand。[
](https://blog.csdn.net/weixin_45145848/article/details/103710855 )
参考 Netty详解之九:ByteBuf介绍_longhuihu的博客-CSDN博客_bytebuf
Netty详解之九:使用ByteBuf_longhuihu的博客-CSDN博客_bytebuf retain
Redis客户端Lettuce源码【四】Lettuce是如何断线重连的_杨_同_学的博客-CSDN博客_lettuce 自动重连