以插入数据为例,分析下spring通过lettuce操作redis。

1
2
3
4
//获取LettuceConnection
RedisConnection connection = connectionFactory.getConnection();
//set操作
connection.set("key".getBytes(StandardCharsets.UTF_8), "123".getBytes(StandardCharsets.UTF_8));

这里的connection为:org.springframework.data.redis.connection.lettuce.LettuceConnection,其实现RedisConnection接口、实现RedisCommands接口,定义了redis的各种操作。

1
2
3
4
5
6
public interface RedisConnection extends RedisCommands, AutoCloseable {//略}
public interface RedisCommands extends RedisKeyCommands, RedisStringCommands, RedisListCommands, RedisSetCommands,
RedisZSetCommands, RedisHashCommands, RedisTxCommands, RedisPubSubCommands, RedisConnectionCommands,
RedisServerCommands, RedisStreamCommands, RedisScriptingCommands, RedisGeoCommands, RedisHyperLogLogCommands { @Nullable
Object execute(String command, byte[]... args);
}

1、LettuceInvoker操作Lettuce

1
2
3
4
5
6
7
@Override
public Boolean set(byte[] key, byte[] value) {
//略
//这里的connection还是LettuceConnection
return connection.invoke().from(RedisStringAsyncCommands::set, key, value)
.get(Converters.stringToBooleanConverter());
}

1.1、LettuceInvoker对象的创建

LettuceInvoker类的解释:

用户Lettuce方法的函数调用. 通常用于将方法调用表示为方法引用,并通过just或from方法之一传递方法参数。

  • just方法记录方法调用并立即评估方法结果。
  • from方法允许组成一个pipeline ,使用转换器转换结果。

Usage example:

LettuceInvoker invoker = …;

Long result = invoker.just(RedisGeoAsyncCommands::geoadd, key, point.getX(), point.getY(), member);

List<byte[]> result = invoker.fromMany(RedisGeoAsyncCommands::geohash, key, members).toList(it -> it.getValueOrElse(null));

RedisFuture委托给LettuceInvoker管理。

Synchronizer可以等待完成,也可以记录转换器的future,以便进一步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
LettuceInvoker invoke() {
//getAsyncConnection获取异步连接
return invoke(getAsyncConnection());
}

LettuceInvoker invoke(RedisClusterAsyncCommands<byte[], byte[]> connection) {
return doInvoke(connection, false);
}
private LettuceInvoker doInvoke(RedisClusterAsyncCommands<byte[], byte[]> connection, boolean statusCommand) {
//省略一些代码
//注意这里LettuceInvoker的第二个参数LettuceInvoker.Synchronizer
//future为:Supplier<RedisFuture<Object>>
//converter为:Converter<Object, Object>
//nullDefault为:Supplier<Object>
return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
try {
Object result = await(future.get());
if (result == null) {
return nullDefault.get();
}
return converter.convert(result);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
});
}
@Nullable
private <T> T await(RedisFuture<T> cmd) {
if (isMulti) {
return null;
}
try {
return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS);
} catch (RuntimeException e) {
throw convertLettuceAccessException(e);
}
}

getAsyncConnection获取异步连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
if (isQueueing() || isPipelined()) {
return getAsyncDedicatedConnection();
}
if (asyncSharedConn != null) {
if (asyncSharedConn instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).async();
}
if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).async();
}
}
return getAsyncDedicatedConnection();
}

在分析获取RedisConnection的时候,了解到StatefulRedisConnectionImpl中维护了RedisAsyncCommandsImpl的实现对象。这里异步获取连接的async属性就是RedisAsyncCommandsImpl对象。

备注:async对象设置与获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public RedisAsyncCommands<K, V> async() {
return async;
}

public StatefulRedisConnectionImpl(RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec,
Duration timeout) {

super(writer, timeout);

this.pushHandler = pushHandler;
this.codec = codec;
//注意这里
this.async = newRedisAsyncCommandsImpl();
this.sync = newRedisSyncCommandsImpl();
this.reactive = newRedisReactiveCommandsImpl();
}

1.2、LettuceInvoker的from方法

这里阅读源码的难点在于:@FunctionalInterface函数式接口和lambda表达式。

首先需要了解两个前提:

  • 创建redis连接的时候,在io.lettuce.core.RedisClient#connectStandaloneAsync方法中调用了newStatefulRedisConnection方法,最终new了一个StatefulRedisConnectionImpl对象,注意其中的async属性。
  • LettuceInvoker的connection属性就是这个async,即RedisAsyncCommandsImpl对象。

下面的源码,通过匿名内部类和lambda表达式,表达多种创建同一个对象的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public Boolean set(byte[] key, byte[] value) {
//创建function的方式一
LettuceInvoker.ConnectionFunction2<byte[], byte[], String> function = new LettuceInvoker.ConnectionFunction2<byte[], byte[], String>() {
@Override
public RedisFuture<String> apply(RedisClusterAsyncCommands<byte[], byte[]> connection, byte[] key, byte[] value) {
return connection.set(key,value);
}
};
//创建function的方式二
LettuceInvoker.ConnectionFunction2<byte[], byte[], String> function = (connection, key1, value1) -> connection.set(key1, value1);
//创建function的方式三
LettuceInvoker.ConnectionFunction2<byte[], byte[], String> function = RedisStringAsyncCommands::set;
return connection.invoke().from(function, key, value)
.get(Converters.stringToBooleanConverter());
}
@FunctionalInterface
interface ConnectionFunction2<T1, T2, R> {
//RedisClusterAsyncCommands带有两个参数调用方法,返回值为io.lettuce.core.RedisFuture
RedisFuture<R> apply(RedisClusterAsyncCommands<byte[], byte[]> connection, T1 t1, T2 t2);
}
//组合pipeline
<R, T1, T2> SingleInvocationSpec<R> from(ConnectionFunction2<T1, T2, R> function, T1 t1, T2 t2) {
Assert.notNull(function, "ConnectionFunction must not be null");
//创建function2的方式一
ConnectionFunction0<R> function2 = new ConnectionFunction0<R>() {
@Override
public RedisFuture<R> apply(RedisClusterAsyncCommands<byte[], byte[]> connection) {
return function.apply(connection, t1, t2);
}
};
//创建function2的方式二
ConnectionFunction0<R> function2 = connection -> function.apply(connection, t1, t2);
return from(function2);
}

@FunctionalInterface
interface ConnectionFunction0<R> {
//RedisClusterAsyncCommands无参数调用方法,返回值为io.lettuce.core.RedisFuture
RedisFuture<R> apply(RedisClusterAsyncCommands<byte[], byte[]> connection);
}
//组合pipeline
<R> SingleInvocationSpec<R> from(ConnectionFunction0<R> function) {
Assert.notNull(function, "ConnectionFunction must not be null");
//创建parent的方式一
Supplier<RedisFuture<R>> parent = new Supplier<RedisFuture<R>>() {
@Override
public RedisFuture<R> get() {
return function.apply(connection);
}
};
//创建parent的方式二
Supplier<RedisFuture<R>> parent = () -> function.apply(connection);
return new DefaultSingleInvocationSpec<>(parent, synchronizer);
}

从代码中可以看出,Spring的LettuceInvoker对RedisClusterAsyncCommands做了一个编排,组装好DefaultSingleInvocationSpec对象,最后在调用SingleInvocationSpec的get方法。

1.3、SingleInvocationSpec的get方法

1
2
3
4
5
6
//.get(Converters.stringToBooleanConverter());
@Override
public <T> T get(Converter<S, T> converter) {
Assert.notNull(converter, "Converter must not be null");
return synchronizer.invoke(parent, converter, () -> null);
}
  • 测试时,这里的converter为Converters.stringToBooleanConverter():将返回的“OK”字符串转Boolean true
  • parent为RedisFuture

synchronizer为LettuceInvoker.Synchronizer,在创建LettuceInvoker对象的过程中,可以看到传入的synchronizer。

换一种方式看LettuceInvoker.Synchronizer对象创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
LettuceInvoker.Synchronizer synchronizer = new LettuceInvoker.Synchronizer() {
@Override
public Object doInvoke(Supplier<RedisFuture<Object>> futureSupplier, Converter<Object, Object> converter, Supplier<Object> nullDefault) {
try {
//先执行futureSupplier.get(),之后再执行await
Object result = await(futureSupplier.get());
if (result == null) {
return nullDefault.get();
}
return converter.convert(result);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
};
@Nullable
private <T> T await(RedisFuture<T> cmd) {
if (isMulti) {
return null;
}
try {
return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS);
} catch (RuntimeException e) {
throw convertLettuceAccessException(e);
}
}
  • 通过执行futureSupplier.get(),通过LettuceInvoker层层编排,最终触发RedisAsyncCommandsImpl#set(K, V)方法。源码看起来比较费劲,通过debug一步步操作,能清晰一点点。
  • 再执行await后,等待timeout时间,从RedisFuture获取结果。

至此终于到了核心执行redis命令的地方。

2、RedisStringAsyncCommands的set方法执行流程

RedisAsyncCommandsImpl实现了AbstractRedisAsyncCommands类,set方法核心实现都在抽象类中:

1
2
3
4
5
6
7
8
9
//RedisCommandBuilder(RedisCodec<K, V> codec)   
//redis命令组装
private final RedisCommandBuilder<K, V> commandBuilder;
@Override
public RedisFuture<String> set(K key, V value) {
//commandBuilder.set(key, value) 组装好Command对象
//之后调用dispatch发送到redis
return dispatch(commandBuilder.set(key, value));
}

2.1、Redis命令组装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Command<K, V, String> set(K key, V value) {
//指定操作为SET
//redis操作结果通过StatusOutput接收
return createCommand(SET, new StatusOutput<>(codec), key, value);
}
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, K key, V value) {
//redis操作的参数
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key).addValue(value);
return createCommand(type, output, args);
}
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
return new Command<>(type, output, args);
}
public class Command<K, V, T> implements RedisCommand<K, V, T> {
//类型,比如SET操作
private final ProtocolKeyword type;
//操作的数据,比如key和val
protected CommandArgs<K, V> args;
//操作redis后的结果
protected CommandOutput<K, V, T> output;
}

2.2、Redis命令发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
//组装异步命令
AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(cmd);
//测试时connection为StatefulRedisConnectionImpl对象
//通过redis连接对象发送redis命令
RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand);
//结果在这个RedisCommand的output属性中
if (dispatched instanceof AsyncCommand) {
return (AsyncCommand<K, V, T>) dispatched;
}
return asyncCommand;
}
@Override
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
//command预处理,根据不同的命令配置一些异步处理
//如:auth命令之后成功之后把password写入到相应变量中
//select db操作成功之后把db值写入到相应变量中等等
RedisCommand<K, V, T> toSend = preProcessCommand(command);
//MULTI操作完成后,将MultiOutput结果重置
potentiallyEnableMulti(command);
//核心方法在父类中:RedisChannelHandler
return super.dispatch(toSend);
}
private final RedisChannelWriter channelWriter;
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
if (debugEnabled) {
logger.debug("dispatching command {}", cmd);
}
//使用装饰者模式实现命令跟踪,本次分析不关注
if (tracingEnabled) {
RedisCommand<K, V, T> commandToSend = cmd;
TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class);
if (provider == null) {
commandToSend = new TracedCommand<>(cmd,clientResources.tracing().initialTraceContextProvider().getTraceContext());
}
return channelWriter.write(commandToSend);
}
//看这里
return channelWriter.write(cmd);
}

首先看一下channelWriter是啥,通过RedisChannelHandler的构造函数向上查看,回到了获取Redis连接的io.lettuce.core.RedisClient#connectStandaloneAsync方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI, Duration timeout) {
assertNotNull(codec);
checkValidRedisURI(redisURI);

logger.debug("Trying to get a Redis connection for: {}", redisURI);
//创建一个有状态的EndPoint用于抽象底层channel的实现,
// DefaultEndpoint内部封装断线重连、重连后成功后回放连接失败期间的command。
// 同时封装了AT_MOST_ONCE、AT_LEAST_ONCE的可靠性实现(该逻辑是基于内存的,所以并不可靠)。
DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources());
RedisChannelWriter writer = endpoint;

//装饰器,添加支持过期时间的执行命令
if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
}

//装饰器,添加Redis事件监听
if (CommandListenerWriter.isSupported(getCommandListeners())) {
writer = new CommandListenerWriter(writer, getCommandListeners());
}

// 创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。
// 此时并没有真正的创建物理连接,该类本身是无状态、线程安全的。
//StatefulRedisConnectionImpl的构造函数中已经创建了sync、async、reactive三种类型的RedisCommand。
// 基于RedisCodec对key和value序列化,通过write把命令真正的发出去。
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);

//异步创建Redis物理连接,返回future对象。后面可以看到future中返回的对象其实还是上面的connection
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new CommandHandler(getOptions(), getResources(), endpoint));

//当CompletableFuture的任务不论是正常完成还是出现异常它都会调用whenComplete这回调函数
future.whenComplete((channelHandler, throwable) -> {
//正常完成:whenComplete返回结果和上级任务一致,异常为null;
//出现异常:whenComplete返回结果为null,异常为上级任务的异常;
if (throwable != null) {
//发生异常后,异步关闭连接
connection.closeAsync();
}
});

return future;
}

从源码中可以看到:channelWriter就是DefaultEndpoint对象,也可能是通过装饰器模式增加一些支持过期时间等功能。下面看channelWriter的write方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Override
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

LettuceAssert.notNull(command, "Command must not be null");
//连接、命令数量等校验当前操作是否可以执行,
//Lettuce内部维护了一个保存已经发送但是还没有收到Redis消息的Command的stack
//可以配置这个stack的长度,防止Redis不可用时stack太长导致内存溢出。
//如果这个stack已经满了,validateWrite会抛出异常
RedisException validation = validateWrite(1);
if (validation != null) {
command.completeExceptionally(validation);
return command;
}

try {
//共享排他锁,底层使用ReentrantLock
//允许多个共享锁(写入程序)同时处理其工作。如果请求独占锁,则独占锁请求者将等待所有共享锁被释放,并且独占工作线程被允许。
//独占锁定是可重入的。允许独占锁所有者获取和释放共享锁。
//incrementWriters相当于获取一个共享锁,当channel状态发生变化的时候,
//如断开连接时会获取排他锁执行一些清理操作。
sharedLock.incrementWriters();

//是否失活状态
if (inActivation) {
command = processActivationCommand(command);
}
//autoFlushCommands默认为true,即每执行一个Redis命令就执行Flush操作发送给Redis,
//如果设置为false,则需要手动flush。
//由于flush操作相对较重,在某些场景下需要继续提升Lettuce的吞吐量可以考虑设置为false。
if (autoFlushCommands) {
//通过io.netty.channel.Channel#isActive判断连接状态
if (isConnected()) {
//核心方法:redis命令写入Channel,并Flush
writeToChannelAndFlush(command);
} else {
//如果当前channel连接已经断开就先放入Buffer中,直接返回AsyncCommand,
//重连之后会把Buffer中的Command再次尝试通过channel发送到Redis中
writeToDisconnectedBuffer(command);
}

} else {
//如果不是立即flush,则将redis命令写入Buffer队列中,等待手动flush
writeToBuffer(command);
}
} finally {
//释放共享锁
sharedLock.decrementWriters();
if (debugEnabled) {
logger.debug("{} write() done", logPrefix());
}
}
//返回命令
return command;
}
private static final AtomicIntegerFieldUpdater<DefaultEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater
.newUpdater(DefaultEndpoint.class, "queueSize");
// access via QUEUE_SIZE
@SuppressWarnings("unused")
private volatile int queueSize = 0;
private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
//queueSize原子性+1操作
QUEUE_SIZE.incrementAndGet(this);
//核心方法
ChannelFuture channelFuture = channelWriteAndFlush(command);

//至多一次
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channelFuture.addListener(AtMostOnceWriteListener.newInstance(this, command));
}
//至少一次
if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
//监听结果,重试操作
channelFuture.addListener(RetryListener.newInstance(this, command));
}
}
  • 使用了AtomicIntegerFieldUpdater,原子性地更新DefaultEndpoint对象的int类型的queueSize属性,该属性无须被声明成AtomicInteger。
1
2
3
4
5
6
7
8
9
protected volatile Channel channel;
private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {

if (debugEnabled) {
logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
}
//io.netty.channel.Channel
return channel.writeAndFlush(command);
}

至此,redis命令写入到了Netty的Channel中,channel会把需要redis命令放入Channel对应的EventLoop的队列中,流程就结束了,EventLoop是一个SingleThreadEventExector,它会回调创建Netty的Bootstrap时配置的CommandHandler的write方法,CommandHandler继承io.netty.channel.ChannelDuplexHandler。代码中的channel创建过程、Channel与EventLoop的关系,Netty处理流程等,需要等学习Netty的时候再分析。

2.3、CommandHandler写命令到Redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
@SuppressWarnings("unchecked")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (debugEnabled) {
logger.debug("{} write(ctx, {}, promise)", logPrefix(), msg);
}
//Redis命令处理
if (msg instanceof RedisCommand) {
writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise);
return;
}

if (msg instanceof List) {
List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg;
if (batch.size() == 1) {
writeSingleCommand(ctx, batch.get(0), promise);
return;
}
writeBatch(ctx, batch, promise);
return;
}
if (msg instanceof Collection) {
writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise);
}
}
private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) {

if (!isWriteable(command)) {
promise.trySuccess();
return;
}
//redis命令压栈,用于保证获取到的Redis返回结果可以与这个Redis命令相对应
addToStack(command, promise);
//跟踪记录
attachTracing(ctx, command);
//调用ChannelHandlerContext把命令真正发送给Redis
//在发送给Redis之前会由CommandEncoder类对RedisCommand进行编码后写入ByteBuf
ctx.write(command, promise);
}
private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque<>();
private void addToStack(RedisCommand<?, ?, ?> command, ChannelPromise promise) {

try {
//再次验证队列是否满了,如果满了就抛出异常
validateWrite(1);
//command.getOutput() == null意味这个这个Command不需要Redis返回
if (command.getOutput() == null) {
// fire&forget commands are excluded from metrics
complete(command);
}
//metrics统计
RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);
//无论promise是什么类型的,最终都会把command放入到stack中
//stack是一个基于数组实现的双向队列
if (promise.isVoid()) {
//如果promise不是Future类型的就直接把当前command放入到stack
stack.add(redisCommand);
} else {
//如果promise是Future类型的,就等future完成后把当前command放入到stack中
promise.addListener(AddToStack.newInstance(stack, redisCommand));
}
} catch (Exception e) {
command.completeExceptionally(e);
throw e;
}
}

到了这一步,redis命令才真正的发送到Redis中。

2.4、CommandHandler从Redis获取命令执行结果

Netty在收到Redis服务端返回的消息之后就会回调CommandHandler的channelRead方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//buffer = ctx.alloc().buffer(8192 * 8);
private ByteBuf buffer;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//io.netty.buffer.ByteBuf
ByteBuf input = (ByteBuf) msg;
//增加一个信息对象,记录当前上下文信息,当内存泄漏检测到该对象有泄露时,可提供给用户
input.touch("CommandHandler.read(…)");
//refCnt 引用计数
if (!input.isReadable() || input.refCnt() == 0) {
logger.warn("{} Input not readable {}, {}", logPrefix(), input.isReadable(), input.refCnt());
return;
}
if (debugEnabled) {
logger.debug("{} Received: {} bytes, {} commands in the stack", logPrefix(), input.readableBytes(), stack.size());
}

try {
if (buffer.refCnt() < 1) {
logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix());
return;
}
if (debugEnabled && ctx.channel() != channel) {
logger.debug("{} Ignoring data for a non-registered channel {}", logPrefix(), ctx.channel());
return;
}
if (traceEnabled) {
logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim());
}
buffer.touch("CommandHandler.read(…)");
buffer.writeBytes(input);
//核心:解码操作
decode(ctx, buffer);
} finally {
input.release();
}
}
//向redis发送的命令存储在这个队列中
private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque<>();
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
//省略一堆代码

//获取但不删除此deque表示的队列头,如果此deque为空,则返回null
RedisCommand<?, ?, ?> command = stack.peek();
try {
//解密
if (!decode(ctx, buffer, command)) {
hasDecodeProgress = true;
decodeBufferPolicy.afterPartialDecode(buffer);
return;
}
} catch (Exception e) {

ctx.close();
throw e;
}
}

解密完毕后,会调用command.complete()方法,即:io.lettuce.core.protocol.AsyncCommand#complete。

核心逻辑就是将从redis中获取的结果塞到CompletableFuture中,便于1.3小节中LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS)获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//向redis发送的命令存储在这个队列中
private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque<>();
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
//省略一堆代码

//获取但不删除此deque表示的队列头,如果此deque为空,则返回null
RedisCommand<?, ?, ?> command = stack.peek();
try {
//解密
if (!decode(ctx, buffer, command)) {
hasDecodeProgress = true;
decodeBufferPolicy.afterPartialDecode(buffer);
return;
}
} catch (Exception e) {

ctx.close();
throw e;
}
}
//依次查看源码,走到此方法:
private RedisStateMachine rsm;
protected boolean decode(ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) {
//buffer:redis返回的数据
//output:redis命令对应的结果接受对象,我的例子中为:io.lettuce.core.output.StatusOutput
return rsm.decode(buffer, output, command::completeExceptionally);
}
/**
* 尝试解码redis响应并返回指示是否已读取完整响应的标志。
*
* @param buffer Buffer containing data from the server.
* @param output Current command output.
* @param errorHandler the error handler
* @return true if a complete response was read.
*/
public boolean decode(ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler) {
buffer.touch("RedisStateMachine.decode(…)");
if (isEmpty(stack)) {
add(stack, new State());
}
if (output == null) {
return isEmpty(stack);
}
//具体解码操作
boolean resp3Indicator = doDecode(buffer, output, errorHandler);
if (debugEnabled) {
logger.debug("Decode done, empty stack: {}", isEmpty(stack));
}
if (isDiscoverProtocol()) {
if (resp3Indicator) {
setProtocolVersion(ProtocolVersion.RESP3);
} else {
setProtocolVersion(ProtocolVersion.RESP2);
}
}
return isEmpty(stack);
}

private boolean doDecode(ByteBuf buffer, CommandOutput<?, ?, ?> output, Consumer<Exception> errorHandler) {

boolean resp3Indicator = false;

State.Result result;

while (!isEmpty(stack)) {
State state = peek(stack);

if (state.type == null) {
if (!buffer.isReadable()) {
break;
}
state.type = readReplyType(buffer);

if (state.type == HELLO_V3 || state.type == MAP) {
resp3Indicator = true;
}

buffer.markReaderIndex();
}
//测试时,type为 SINGLE('+', RedisStateMachine::handleSingle),
result = state.type.handle(this, state, buffer, output, errorHandler);
if (State.Result.BREAK_LOOP.equals(result)) {
break;
} else if (State.Result.CONTINUE_LOOP.equals(result)) {
continue;
}
buffer.markReaderIndex();
remove(stack);

output.complete(size(stack));
}

return resp3Indicator;
}
//调用handleSingle方法
static State.Result handleSingle(RedisStateMachine rsm, State state, ByteBuf buffer, CommandOutput<?, ?, ?> output,
Consumer<Exception> errorHandler) {
ByteBuffer bytes;

if ((bytes = rsm.readLine(buffer)) == null) {
return State.Result.BREAK_LOOP;
}

if (!QUEUED.equals(bytes)) {
rsm.safeSetSingle(output, bytes, errorHandler);
}
return State.Result.NORMAL_END;
}
protected void safeSetSingle(CommandOutput<?, ?, ?> output, ByteBuffer bytes, Consumer<Exception> errorHandler) {

try {
output.set(bytes);
} catch (Exception e) {
errorHandler.accept(e);
}
}
public class StatusOutput<K, V> extends CommandOutput<K, V, String> {

private static final ByteBuffer OK = StandardCharsets.US_ASCII.encode("OK");

public StatusOutput(RedisCodec<K, V> codec) {
super(codec, null);
}

@Override
public void set(ByteBuffer bytes) {
output = OK.equals(bytes) ? "OK" : decodeAscii(bytes);
}

}

简单总结一下就是:获取到redis的相应后,解码,最终写入到“redis命令”对象中的output属性中。

2.5、Lettuce怎么保证发送命令和返回结果一一对应?

  • Lettuce与Redis之间只有一条tcp连接
  • Lettuce将redis命令放入stack时是有序(代码中主要使用ArrayDeque<RedisCommand> 数组双端队列来实现功能)
  • tcp协议本身是有序
  • redis是单线程处理请求的,所以Redis返回的消息也是有序的
  • Lettuce的断线重连操作ConnectionWatchdog

这样就能保证Redis中返回的消息一定对应着stack中的第一个RedisCommand。[

](https://blog.csdn.net/weixin_45145848/article/details/103710855)

参考

Netty详解之九:ByteBuf介绍_longhuihu的博客-CSDN博客_bytebuf

Netty详解之九:使用ByteBuf_longhuihu的博客-CSDN博客_bytebuf retain

Redis客户端Lettuce源码【四】Lettuce是如何断线重连的_杨_同_学的博客-CSDN博客_lettuce 自动重连