• SpringRedisGenericCache key-val 结构数据存储,支持key过期时间控制。
  • SpringRedisCache hash结构数据存储。

以SpringRedisGenericCache中setBytes方法为例分析RedisTemplate操作redis原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void setBytes(String key, byte[] bytes, long timeToLiveInSeconds) {
if (timeToLiveInSeconds <= 0) {
log.debug(String.format("Invalid timeToLiveInSeconds value : %d , skipped it.", timeToLiveInSeconds));
setBytes(key, bytes);
} else {

redisTemplate.execute((RedisCallback<List<byte[]>>) redis -> {
redis.setEx(_key(key), (int) timeToLiveInSeconds, bytes);
return null;
});
}
}

从源码中可以看到,RedisTemplate的execute方法入参为RedisCallback回调接口,主要用户获取到RedisConnection之后,再执行操作redis的底层代码。RedisTemplate的这种设计,将redis连接的获取与释放对使用者隐藏,redis的具体操作对使用者开放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.RedisOperations#execute(org.springframework.data.redis.core.RedisCallback)
*/
@Override
@Nullable
public <T> T execute(RedisCallback<T> action) {
return execute(action, isExposeConnection());
}
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {

Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
//获取RedisConnectionFactory,这里为LettuceConnectionFactory
RedisConnectionFactory factory = getRequiredConnectionFactory();
//获取RedisConnection,底层就是LettuceConnectionFactory.getConnection()
RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport);

try {

boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = preProcessConnection(conn, existingConnection);

boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}

RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
//回调方法,具体的redis方法由使用者控制,这里将RedisConnection通过参数传给使用者
T result = action.doInRedis(connToExpose);

// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}

return postProcessResult(result, connToUse, existingConnection);
} finally {
//释放RedisConnection
RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
}
}

使用模板设计模式与回调机制实现了操作redis的整个流程:

  • 获取RedisConnectionFactory,测试时,这里为LettuceConnectionFactory
  • 获取RedisConnection,底层就是LettuceConnectionFactory.getConnection()
  • 回调方法,具体的redis方法由使用者控制,这里将RedisConnection通过参数传给使用者
  • 释放RedisConnection

2.1、获取RedisConnection

2.1.1、Spring提供的redis连接工厂

RedisConnectionFactory–> ExceptionTranslatingConnectionProvider–>LettucePoolingConnectionProvider–>StandaloneConnectionProvider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Obtain a connection from the associated {@link LettuceConnectionProvider}.
*
* @return the connection.
*/
private StatefulConnection<E, E> getNativeConnection() {
//connectionProvider为:ExceptionTranslatingConnectionProvider
return connectionProvider.getConnection(StatefulConnection.class);
}
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
try {
//delegate为LettucePoolingConnectionProvider
return delegate.getConnection(connectionType);
} catch (RuntimeException e) {
//异常转换
throw translateException(e);
}
}
private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap<>(32);
private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap<>(32);
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
//GenericObjectPool:org.apache.commons.pool2.impl.GenericObjectPool
GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
//connectionProvider为:StandaloneConnectionProvider
return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
poolConfig, false);
});

try {
StatefulConnection<?, ?> connection = pool.borrowObject();
poolRef.put(connection, pool);
return connectionType.cast(connection);
} catch (Exception e) {
throw new PoolException("Could not get a resource from the pool", e);
}
}
@Override
public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType) {
return getConnectionAsync(connectionType, redisURISupplier.get());
}
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

if (connectionType.equals(StatefulRedisSentinelConnection.class)) {
return connectionType.cast(client.connectSentinel());
}

if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
return connectionType.cast(client.connectPubSub(codec));
}

if (StatefulConnection.class.isAssignableFrom(connectionType)) {
//获取redis连接,这里的client为:io.lettuce.core.RedisClient
return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
.orElseGet(() -> client.connect(codec)));
}

throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
}

2.1.2、Lettuce创建连接

阅读创建Redis连接的代码有以下几个难点:

  • CompletableFuture的使用
  • Lettuce的EventRecorder
  • Reactor的Mono使用
  • Netty的使用

2.1.2.1、入口connect

1
2
3
4
5
6
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {
assertNotNull(redisURI);
//connectStandaloneAsync是异步创建connection,返回的是Future对象,
//通过getConnection转为同步操作
return getConnection(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout()));
}

依次通过connectStandaloneAsync->connectStatefulAsync->initializeChannelAsync->initializeChannelAsync0等方法找到Netty连接Redis源码。

2.1.2.2、connectStandaloneAsync

主要创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI, Duration timeout) {

assertNotNull(codec);
checkValidRedisURI(redisURI);

logger.debug("Trying to get a Redis connection for: {}", redisURI);
//创建一个有状态的EndPoint用于抽象底层channel的实现,
// DefaultEndpoint内部封装断线重连、重连后成功后回放连接失败期间的command。
// 同时封装了AT_MOST_ONCE、AT_LEAST_ONCE的可靠性实现(该逻辑是基于内存的,所以并不可靠)。
DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources());
RedisChannelWriter writer = endpoint;

//装饰器,添加支持过期时间的执行命令
if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
}

//装饰器,添加Redis事件监听
if (CommandListenerWriter.isSupported(getCommandListeners())) {
writer = new CommandListenerWriter(writer, getCommandListeners());
}

// 创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。
// 此时并没有真正的创建物理连接,该类本身是无状态、线程安全的。
//StatefulRedisConnectionImpl的构造函数中已经创建了sync、async、reactive三种类型的RedisCommand。
// 基于RedisCodec对key和value序列化,通过write把命令真正的发出去。
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);

//异步创建Redis物理连接,返回future对象。后面可以看到future中返回的对象其实还是上面的connection
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new CommandHandler(getOptions(), getResources(), endpoint));

//当CompletableFuture的任务不论是正常完成还是出现异常它都会调用whenComplete这回调函数
future.whenComplete((channelHandler, throwable) -> {
//正常完成:whenComplete返回结果和上级任务一致,异常为null;
//出现异常:whenComplete返回结果为null,异常为上级任务的异常;
if (throwable != null) {
//发生异常后,异步关闭连接
connection.closeAsync();
}
});

return future;
}

2.1.2.3、connectStatefulAsync

构建ConnectionBuilder,通过ConnectionBuilder来创建connection。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SuppressWarnings("unchecked")
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint,
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
//构建ConnectionBuilder,通过ConnectionBuilder来创建connection
ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
sslConnectionBuilder.ssl(redisURI);
connectionBuilder = sslConnectionBuilder;
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}

ConnectionState state = connection.getConnectionState();
state.apply(redisURI);
state.setDb(redisURI.getDatabase());

//填充StatefulRedisConnectionImpl
connectionBuilder.connection(connection);
//控制RedisClient行为的一些配置参数
connectionBuilder.clientOptions(getOptions());
//ClientResource包含了一些EventLoopGroup信息
connectionBuilder.clientResources(getResources());
//配置commandHandlerSupplier,这个commandHandler很重要,是实现StatefulRedisConnectionImpl线程安全的关键,后面会详细讲。
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
//connectionBuilder填充Bootstrap、channelGroup等更多的信息
//getSocketAddressSupplier是根据redisURI获取真正的Redis连接信息,如:sentinel模式下,需要从sentinel获取到真实的redis连接地址
//通过Mono.defer创建Mono<SocketAddress>,属于懒汉模式
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI);
//初始化连接准备工作
connectionBuilder.connectionInitializer(createHandshake(state));
//初始化redis连接,返回ChannelFuture对象,真正的异步的去创建物理连接
ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);

//把上一个线程的结果“应用于”下一个线程的计算。相当于结果值的传递。
return future.thenApply(channelHandler -> (S) connection);
}

getSocketAddressSupplier方法中,通过Mono.defer懒汉方式创建 Mono对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Mono<SocketAddress> getSocketAddressSupplier(RedisURI redisURI) {
return getSocketAddress(redisURI).doOnNext(addr -> logger.debug("Resolved SocketAddress {} using {}", addr, redisURI));
}
protected Mono<SocketAddress> getSocketAddress(RedisURI redisURI) {

return Mono.defer(() -> {

if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) {
logger.debug("Connecting to Redis using Sentinels {}, MasterId {}", redisURI.getSentinels(),
redisURI.getSentinelMasterId());
return lookupRedis(redisURI).switchIfEmpty(Mono.error(new RedisConnectionException(
"Cannot provide redisAddress using sentinel for masterId " + redisURI.getSentinelMasterId())));

} else {
return Mono.fromCallable(() -> getResources().socketAddressResolver().resolve((redisURI)));
}
});
}

connectionBuilder方法中创建了Netty客户端的Bootstrap对象,并配置Netty的各种参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
ConnectionEvents connectionEvents, RedisURI redisURI) {
//创建Netty客户端的Bootstrap对象
Bootstrap redisBootstrap = new Bootstrap();
//Bootstrap的一些配置参数,具体可以参考Netty的相关书籍(Netty权威指南)
redisBootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
connectionBuilder.bootstrap(redisBootstrap);
//Netty参数配置
connectionBuilder.apply(redisURI);
//EventLoopGroup
connectionBuilder.configureBootstrap(!LettuceStrings.isEmpty(redisURI.getSocket()), this::getEventLoopGroup);
//ChannelGroup
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents == this.connectionEvents ? connectionEvents
: ConnectionEvents.of(this.connectionEvents, connectionEvents));
//配置socket地址提供者
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}

2.1.2.4、initializeChannelAsync

SocketAddress及Channel对象的异步处理,最终调用真正获取物理连接的方法initializeChannelAsync0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {

//通过Mono.defer创建的Mono<SocketAddress>,属于懒汉模式
Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();

if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}

//创建socketAddressFuture 对象
//当socketAddressSupplier异步获取SocketAddress成功之后会把SocketAddress数据放入该对象中
CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>();
//创建channelReadyFuture,当连接建立成功之后会把Channel对象放入该对象中
CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();

String uriString = connectionBuilder.getRedisURI().toString();

//事件检录,依赖jdk.jfr.Event,好像是JDK11之后才有
EventRecorder.getInstance().record(
new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId()));

EventRecorder.RecordableEvent event = EventRecorder.getInstance()
.start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId()));

//channelReadyFuture完成后执行
channelReadyFuture.whenComplete((channel, throwable) -> {
//完成事件报告
event.record();
});

//配置获取SocketAddress异步操作之后的操作:
//1. 把SocketAddress对象放入socketAddressFuture中
//2. 基于SocketAddress调用initializeChannelAsync0方法真正去建立连接
socketAddressSupplier
//发生错误后执行
.doOnError(socketAddressFuture::completeExceptionally)
//成功后执行
.doOnNext(socketAddressFuture::complete)
//订阅Mono
.subscribe(redisAddress -> {
//如果此CompletableFuture在正常完成之前被取消,则返回true。
if (channelReadyFuture.isCancelled()) {
return;
}
//异步建立真正的连接,如果建立成功会把生产的Channel对象放入channelReadyFuture中
initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);
}, channelReadyFuture::completeExceptionally);
//建立连接成功之后返回的还是connectionBuilder的connection对象,即StatefulRedisConnectionImpl
return new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder.connection()));
}

2.1.2.5、initializeChannelAsync0

操作Netty创建连接,大量使用了CompletableFuture等异步编程方式,代码看着比较吃力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
SocketAddress redisAddress) {

logger.debug("Connecting to Redis at {}", redisAddress);

Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//创建PlainChannelInitializer对象,PlainChannelInitializer对象会在Channel初始化的时候添加很多Handlers(Netty的Handler概念可以参考Netty权威指南),
// 如:CommandEncoder、CommandHandler(非常重要的Handler)、ConnectionWatchdog(实现断线重连)
ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress);
//RedisChannelInitializer配置到Bootstrap中
redisBootstrap.handler(initializer);
//调用一些通过ClientResources自定义的回调函数
clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
//真正的通过Netty异步的方式去建立物理连接,返回ChannelFuture对象
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);


channelReadyFuture.whenComplete((c, t) -> {
//异常处理
if (t instanceof CancellationException) {
//netty取消连接
connectFuture.cancel(true);
}
});

//netty 增加监听
connectFuture.addListener(future -> {

if (!future.isSuccess()) {
logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
//重置
connectionBuilder.endpoint().initialState();
//记录异常
channelReadyFuture.completeExceptionally(future.cause());
return;
}

//必须有RedisHandshakeHandler,使用ConnectionInitializer初始化连接
RedisHandshakeHandler handshakeHandler = connectFuture.channel().pipeline().get(RedisHandshakeHandler.class);
if (handshakeHandler == null) {
//记录异常
channelReadyFuture.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered"));
return;
}

//future to synchronize channel initialization. Returns a new future for every reconnect.
//连接redis完成后执行
handshakeHandler.channelInitialized().whenComplete((success, throwable) -> {
//如果异常为空,则初始化成功。
if (throwable == null) {
logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
//把成功之后的结果赋值给channelReadyFuture对象
channelReadyFuture.complete(connectFuture.channel());
return;
}

logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
//将此端点重置为其初始状态,清除所有缓冲区并可能关闭绑定的通道。
connectionBuilder.endpoint().initialState();
//记录异常
channelReadyFuture.completeExceptionally(throwable);
});
});
}