1 2 3 4 RedisConnection connection = connectionFactory.getConnection();connection.set("key" .getBytes(StandardCharsets.UTF_8), "123" .getBytes(StandardCharsets.UTF_8));
1 2 3 4 5 6 public interface RedisConnection extends RedisCommands , AutoCloseable {public interface RedisCommands extends RedisKeyCommands , RedisStringCommands, RedisListCommands, RedisSetCommands, RedisZSetCommands, RedisHashCommands, RedisTxCommands, RedisPubSubCommands, RedisConnectionCommands, RedisServerCommands, RedisStreamCommands, RedisScriptingCommands, RedisGeoCommands, RedisHyperLogLogCommands { @Nullable Object execute (String command, byte []... args) ; }
1、LettuceInvoker操作Lettuce 1 2 3 4 5 6 7 @Override public Boolean set (byte [] key, byte [] value) { return connection.invoke().from(RedisStringAsyncCommands::set, key, value) .get(Converters.stringToBooleanConverter()); }
1.1、LettuceInvoker对象的创建 LettuceInvoker类的解释:
用户Lettuce方法的函数调用. 通常用于将方法调用表示为方法引用,并通过just或from方法之一传递方法参数。
from方法允许组成一个pipeline ,使用转换器转换结果。
Usage example:
LettuceInvoker invoker = …;
Long result = invoker.just(RedisGeoAsyncCommands::geoadd, key, point.getX(), point.getY(), member);
List<byte[]> result = invoker.fromMany(RedisGeoAsyncCommands::geohash, key, members).toList(it -> it.getValueOrElse(null));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 LettuceInvoker invoke () { return invoke(getAsyncConnection()); } LettuceInvoker invoke (RedisClusterAsyncCommands<byte [], byte []> connection) { return doInvoke(connection, false ); } private LettuceInvoker doInvoke (RedisClusterAsyncCommands<byte [], byte []> connection, boolean statusCommand) { return new LettuceInvoker (connection, (future, converter, nullDefault) -> { try { Object result = await(future.get()); if (result == null ) { return nullDefault.get(); } return converter.convert(result); } catch (Exception ex) { throw convertLettuceAccessException(ex); } }); } @Nullable private <T> T await (RedisFuture<T> cmd) { if (isMulti) { return null ; } try { return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); } catch (RuntimeException e) { throw convertLettuceAccessException(e); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 RedisClusterAsyncCommands<byte [], byte []> getAsyncConnection() { if (isQueueing() || isPipelined()) { return getAsyncDedicatedConnection(); } if (asyncSharedConn != null ) { if (asyncSharedConn instanceof StatefulRedisConnection) { return ((StatefulRedisConnection<byte [], byte []>) asyncSharedConn).async(); } if (asyncSharedConn instanceof StatefulRedisClusterConnection) { return ((StatefulRedisClusterConnection<byte [], byte []>) asyncSharedConn).async(); } } return getAsyncDedicatedConnection(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public RedisAsyncCommands<K, V> async () { return async; } public StatefulRedisConnectionImpl (RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout) { super (writer, timeout); this .pushHandler = pushHandler; this .codec = codec; this .async = newRedisAsyncCommandsImpl(); this .sync = newRedisSyncCommandsImpl(); this .reactive = newRedisReactiveCommandsImpl(); }
1.2、LettuceInvoker的from方法 这里阅读源码的难点在于:@FunctionalInterface函数式接口和lambda表达式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public Boolean set (byte [] key, byte [] value) { LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = new LettuceInvoker .ConnectionFunction2<byte [], byte [], String>() { @Override public RedisFuture<String> apply (RedisClusterAsyncCommands<byte [], byte []> connection, byte [] key, byte [] value) { return connection.set(key,value); } }; LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = (connection, key1, value1) -> connection.set(key1, value1); LettuceInvoker.ConnectionFunction2<byte [], byte [], String> function = RedisStringAsyncCommands::set; return connection.invoke().from(function, key, value) .get(Converters.stringToBooleanConverter()); } @FunctionalInterface interface ConnectionFunction2 <T1, T2, R> { RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection, T1 t1, T2 t2) ; } <R, T1, T2> SingleInvocationSpec<R> from (ConnectionFunction2<T1, T2, R> function, T1 t1, T2 t2) { Assert.notNull(function, "ConnectionFunction must not be null" ); ConnectionFunction0<R> function2 = new ConnectionFunction0 <R>() { @Override public RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection) { return function.apply(connection, t1, t2); } }; ConnectionFunction0<R> function2 = connection -> function.apply(connection, t1, t2); return from(function2); } @FunctionalInterface interface ConnectionFunction0 <R> { RedisFuture<R> apply (RedisClusterAsyncCommands<byte [], byte []> connection) ; } <R> SingleInvocationSpec<R> from (ConnectionFunction0<R> function) { Assert.notNull(function, "ConnectionFunction must not be null" ); Supplier<RedisFuture<R>> parent = new Supplier <RedisFuture<R>>() { @Override public RedisFuture<R> get () { return function.apply(connection); } }; Supplier<RedisFuture<R>> parent = () -> function.apply(connection); return new DefaultSingleInvocationSpec <>(parent, synchronizer); }
1.3、SingleInvocationSpec的get方法 1 2 3 4 5 6 @Override public <T> T get (Converter<S, T> converter) { Assert.notNull(converter, "Converter must not be null" ); return synchronizer.invoke(parent, converter, () -> null ); }
测试时,这里的converter为Converters.stringToBooleanConverter():将返回的“OK”字符串转Boolean true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 LettuceInvoker.Synchronizer synchronizer = new LettuceInvoker .Synchronizer() { @Override public Object doInvoke (Supplier<RedisFuture<Object>> futureSupplier, Converter<Object, Object> converter, Supplier<Object> nullDefault) { try { Object result = await(futureSupplier.get()); if (result == null ) { return nullDefault.get(); } return converter.convert(result); } catch (Exception ex) { throw convertLettuceAccessException(ex); } } }; @Nullable private <T> T await (RedisFuture<T> cmd) { if (isMulti) { return null ; } try { return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS); } catch (RuntimeException e) { throw convertLettuceAccessException(e); } }
通过执行futureSupplier.get(),通过LettuceInvoker层层编排,最终触发RedisAsyncCommandsImpl#set(K, V)方法。源码看起来比较费劲,通过debug一步步操作,能清晰一点点。
2、RedisStringAsyncCommands的set方法执行流程 RedisAsyncCommandsImpl实现了AbstractRedisAsyncCommands类,set方法核心实现都在抽象类中:
1 2 3 4 5 6 7 8 9 private final RedisCommandBuilder<K, V> commandBuilder; @Override public RedisFuture<String> set (K key, V value) { return dispatch(commandBuilder.set(key, value)); }
2.1、Redis命令组装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Command<K, V, String> set (K key, V value) { return createCommand(SET, new StatusOutput <>(codec), key, value); } protected <T> Command<K, V, T> createCommand (CommandType type, CommandOutput<K, V, T> output, K key, V value) { CommandArgs<K, V> args = new CommandArgs <>(codec).addKey(key).addValue(value); return createCommand(type, output, args); } protected <T> Command<K, V, T> createCommand (CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) { return new Command <>(type, output, args); } public class Command <K, V, T> implements RedisCommand <K, V, T> { private final ProtocolKeyword type; protected CommandArgs<K, V> args; protected CommandOutput<K, V, T> output; }
2.2、Redis命令发送 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public <T> AsyncCommand<K, V, T> dispatch (RedisCommand<K, V, T> cmd) { AsyncCommand<K, V, T> asyncCommand = new AsyncCommand <>(cmd); RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand); if (dispatched instanceof AsyncCommand) { return (AsyncCommand<K, V, T>) dispatched; } return asyncCommand; } @Override public <T> RedisCommand<K, V, T> dispatch (RedisCommand<K, V, T> command) { RedisCommand<K, V, T> toSend = preProcessCommand(command); potentiallyEnableMulti(command); return super .dispatch(toSend); } private final RedisChannelWriter channelWriter;protected <T> RedisCommand<K, V, T> dispatch (RedisCommand<K, V, T> cmd) { if (debugEnabled) { logger.debug("dispatching command {}" , cmd); } if (tracingEnabled) { RedisCommand<K, V, T> commandToSend = cmd; TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class); if (provider == null ) { commandToSend = new TracedCommand <>(cmd,clientResources.tracing().initialTraceContextProvider().getTraceContext()); } return channelWriter.write(commandToSend); } return channelWriter.write(cmd); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync (RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { assertNotNull(codec); checkValidRedisURI(redisURI); logger.debug("Trying to get a Redis connection for: {}" , redisURI); DefaultEndpoint endpoint = new DefaultEndpoint (getOptions(), getResources()); RedisChannelWriter writer = endpoint; if (CommandExpiryWriter.isSupported(getOptions())) { writer = new CommandExpiryWriter (writer, getOptions(), getResources()); } if (CommandListenerWriter.isSupported(getCommandListeners())) { writer = new CommandListenerWriter (writer, getCommandListeners()); } StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout); ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler (getOptions(), getResources(), endpoint)); future.whenComplete((channelHandler, throwable) -> { if (throwable != null ) { connection.closeAsync(); } }); return future; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 @Override public <K, V, T> RedisCommand<K, V, T> write (RedisCommand<K, V, T> command) { LettuceAssert.notNull(command, "Command must not be null" ); RedisException validation = validateWrite(1 ); if (validation != null ) { command.completeExceptionally(validation); return command; } try { sharedLock.incrementWriters(); if (inActivation) { command = processActivationCommand(command); } if (autoFlushCommands) { if (isConnected()) { writeToChannelAndFlush(command); } else { writeToDisconnectedBuffer(command); } } else { writeToBuffer(command); } } finally { sharedLock.decrementWriters(); if (debugEnabled) { logger.debug("{} write() done" , logPrefix()); } } return command; } private static final AtomicIntegerFieldUpdater<DefaultEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater.newUpdater(DefaultEndpoint.class, "queueSize" ); @SuppressWarnings("unused") private volatile int queueSize = 0 ;private void writeToChannelAndFlush (RedisCommand<?, ?, ?> command) { QUEUE_SIZE.incrementAndGet(this ); ChannelFuture channelFuture = channelWriteAndFlush(command); if (reliability == Reliability.AT_MOST_ONCE) { channelFuture.addListener(AtMostOnceWriteListener.newInstance(this , command)); } if (reliability == Reliability.AT_LEAST_ONCE) { channelFuture.addListener(RetryListener.newInstance(this , command)); } }
1 2 3 4 5 6 7 8 9 protected volatile Channel channel;private ChannelFuture channelWriteAndFlush (RedisCommand<?, ?, ?> command) { if (debugEnabled) { logger.debug("{} write() writeAndFlush command {}" , logPrefix(), command); } return channel.writeAndFlush(command); }
2.3、CommandHandler写命令到Redis 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Override @SuppressWarnings("unchecked") public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (debugEnabled) { logger.debug("{} write(ctx, {}, promise)" , logPrefix(), msg); } if (msg instanceof RedisCommand) { writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise); return ; } if (msg instanceof List) { List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg; if (batch.size() == 1 ) { writeSingleCommand(ctx, batch.get(0 ), promise); return ; } writeBatch(ctx, batch, promise); return ; } if (msg instanceof Collection) { writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise); } } private void writeSingleCommand (ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) { if (!isWriteable(command)) { promise.trySuccess(); return ; } addToStack(command, promise); attachTracing(ctx, command); ctx.write(command, promise); } private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque <>(); private void addToStack (RedisCommand<?, ?, ?> command, ChannelPromise promise) { try { validateWrite(1 ); if (command.getOutput() == null ) { complete(command); } RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command); if (promise.isVoid()) { stack.add(redisCommand); } else { promise.addListener(AddToStack.newInstance(stack, redisCommand)); } } catch (Exception e) { command.completeExceptionally(e); throw e; } }
2.4、CommandHandler从Redis获取命令执行结果 Netty在收到Redis服务端返回的消息之后就会回调CommandHandler的channelRead方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private ByteBuf buffer;@Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf input = (ByteBuf) msg; input.touch("CommandHandler.read(…)" ); if (!input.isReadable() || input.refCnt() == 0 ) { logger.warn("{} Input not readable {}, {}" , logPrefix(), input.isReadable(), input.refCnt()); return ; } if (debugEnabled) { logger.debug("{} Received: {} bytes, {} commands in the stack" , logPrefix(), input.readableBytes(), stack.size()); } try { if (buffer.refCnt() < 1 ) { logger.warn("{} Ignoring received data for closed or abandoned connection" , logPrefix()); return ; } if (debugEnabled && ctx.channel() != channel) { logger.debug("{} Ignoring data for a non-registered channel {}" , logPrefix(), ctx.channel()); return ; } if (traceEnabled) { logger.trace("{} Buffer: {}" , logPrefix(), input.toString(Charset.defaultCharset()).trim()); } buffer.touch("CommandHandler.read(…)" ); buffer.writeBytes(input); decode(ctx, buffer); } finally { input.release(); } } private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque <>();protected void decode (ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException { RedisCommand<?, ?, ?> command = stack.peek(); try { if (!decode(ctx, buffer, command)) { hasDecodeProgress = true ; decodeBufferPolicy.afterPartialDecode(buffer); return ; } } catch (Exception e) { ctx.close(); throw e; } }
核心逻辑就是将从redis中获取的结果塞到CompletableFuture中,便于1.3小节中LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS)获取结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque <>();protected void decode (ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException { RedisCommand<?, ?, ?> command = stack.peek(); try { if (!decode(ctx, buffer, command)) { hasDecodeProgress = true ; decodeBufferPolicy.afterPartialDecode(buffer); return ; } } catch (Exception e) { ctx.close(); throw e; } } private RedisStateMachine rsm;protected boolean decode (ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) { return rsm.decode(buffer, output, command::completeExceptionally); } public boolean decode (ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler) { buffer.touch("RedisStateMachine.decode(…)" ); if (isEmpty(stack)) { add(stack, new State ()); } if (output == null ) { return isEmpty(stack); } boolean resp3Indicator = doDecode(buffer, output, errorHandler); if (debugEnabled) { logger.debug("Decode done, empty stack: {}" , isEmpty(stack)); } if (isDiscoverProtocol()) { if (resp3Indicator) { setProtocolVersion(ProtocolVersion.RESP3); } else { setProtocolVersion(ProtocolVersion.RESP2); } } return isEmpty(stack); } private boolean doDecode (ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler) { boolean resp3Indicator = false ; State.Result result; while (!isEmpty(stack)) { State state = peek(stack); if (state.type == null ) { if (!buffer.isReadable()) { break ; } state.type = readReplyType(buffer); if (state.type == HELLO_V3 || state.type == MAP) { resp3Indicator = true ; } buffer.markReaderIndex(); } result = state.type.handle(this , state, buffer, output, errorHandler); if (State.Result.BREAK_LOOP.equals(result)) { break ; } else if (State.Result.CONTINUE_LOOP.equals(result)) { continue ; } buffer.markReaderIndex(); remove(stack); output.complete(size(stack)); } return resp3Indicator; } static State.Result handleSingle (RedisStateMachine rsm, State state, ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler) { ByteBuffer bytes; if ((bytes = rsm.readLine(buffer)) == null ) { return State.Result.BREAK_LOOP; } if (!QUEUED.equals(bytes)) { rsm.safeSetSingle(output, bytes, errorHandler); } return State.Result.NORMAL_END; } protected void safeSetSingle (CommandOutput<?, ?, ?> output, ByteBuffer bytes, Consumer<Exception> errorHandler) { try { output.set(bytes); } catch (Exception e) { errorHandler.accept(e); } } public class StatusOutput <K, V> extends CommandOutput <K, V, String> { private static final ByteBuffer OK = StandardCharsets.US_ASCII.encode("OK" ); public StatusOutput (RedisCodec<K, V> codec) { super (codec, null ); } @Override public void set (ByteBuffer bytes) { output = OK.equals(bytes) ? "OK" : decodeAscii(bytes); } }
Lettuce将redis命令放入stack时是有序(代码中主要使用ArrayDeque<RedisCommand, ?, ?>> 数组双端队列来实现功能)
](https://blog.csdn.net/weixin_45145848/article/details/103710855 )
参考 Netty详解之九:ByteBuf介绍_longhuihu的博客-CSDN博客_bytebuf
Netty详解之九:使用ByteBuf_longhuihu的博客-CSDN博客_bytebuf retain
Redis客户端Lettuce源码【四】Lettuce是如何断线重连的_杨_同_学的博客-CSDN博客_lettuce 自动重连