从三个方向学习:

  • 系统启动,加载J2CacheSpringRedisAutoConfiguration配置

  • J2Cache缓存集群策略SpringRedisPubSubPolicy

  • 二级缓存操作:SpringRedisProvider以及SpringRedisGenericCache、SpringRedisCache

1、RedisTemplate对象创建

  • J2CacheSpringRedisAutoConfiguration配置文件在RedisAutoConfiguration之后,J2CacheAutoConfiguration之前加载。
  • 通过j2cache.redis-client可以配置redis客户端:Jedis或则Lettuce,本次学习使用Lettuce。
  • 使用RedisStandaloneConfiguration构建LettuceConnectionFactory对象。
  • 最后将RedisTemplate的对象交给Spring管理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Bean("j2CacheRedisTemplate")
public RedisTemplate<String, Serializable> j2CacheRedisTemplate(
@Qualifier("j2CahceRedisConnectionFactory") RedisConnectionFactory j2CahceRedisConnectionFactory,
@Qualifier("j2CacheValueSerializer") RedisSerializer<Object> j2CacheSerializer) {
RedisTemplate<String, Serializable> template = new RedisTemplate<String, Serializable>();
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setDefaultSerializer(j2CacheSerializer);
template.setConnectionFactory(j2CahceRedisConnectionFactory);
return template;
}
//j2CahceRedisConnectionFactory为:LettuceConnectionFactory
//j2CacheSerializer为:J2CacheSerializer,底层使用net.oschina.j2cache.util.SerializationUtils#g_serializer
@Primary
@Bean("j2CahceRedisConnectionFactory")
public LettuceConnectionFactory lettuceConnectionFactory(net.oschina.j2cache.J2CacheConfig j2CacheConfig) {
//简化了一下
LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder config = LettucePoolingClientConfiguration.builder();
config.commandTimeout(Duration.ofMillis(5000));
config.poolConfig(getGenericRedisPool());
RedisStandaloneConfiguration single = new RedisStandaloneConfiguration("xxxx", 6379);
single.setDatabase(10);
single.setPassword("****");
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(single, config.build());
return connectionFactory;
}

再看RedisTemplate类:
img

RedisTemplate继承了RedisAccessor类,实现RedisOperations和BeanClassLoaderAware接口,其中RedisAccessor维护了redis连接工厂,RedisOperations定义redis相关操作。

这里的connectionFactory为LettuceConnectionFactory:操作Redis的准备工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private @Nullable AbstractRedisClient client;
private @Nullable LettuceConnectionProvider connectionProvider;
private @Nullable LettuceConnectionProvider reactiveConnectionProvider;

public void afterPropertiesSet() {
//获取RedisClient客户端
this.client = createClient();
//org.springframework.data.redis.connection.lettuce.LettuceConnection#CODEC
//ByteArrayCodec
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));

//org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#CODEC
//ByteBufferCodec
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));

//略
}
protected AbstractRedisClient createClient() {
//略
RedisURI uri = isDomainSocketAware()
? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket())
: createRedisURIAndApplySettings(getHostName(), getPort());

//clientConfiguration为:DefaultLettucePoolingClientConfiguration
RedisClient redisClient = clientConfiguration.getClientResources() //
.map(clientResources -> RedisClient.create(clientResources, uri)) //
.orElseGet(() -> RedisClient.create(uri));
clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);

return redisClient;
}
  • 首先创建RedisClient客户端

  • 之后创建LettuceConnectionProvider,这里使用了装饰器模式:StandaloneConnectionProvider->LettucePoolingConnectionProvider->ExceptionTranslatingConnectionProvider,从名称上也能大概看出装-

    • StandaloneConnectionProvider:独立的获取redis连接,底层使用Netty框架。
  • LettucePoolingConnectionProvider:redis连接池化技术,使用org.apache.commons.pool2.impl.GenericObjectPool

  • ExceptionTranslatingConnectionProvider:Redis连接异常转换。

2、Redis数据存取

  • SpringRedisGenericCache key-val 结构数据存储,支持key过期时间控制。
  • SpringRedisCache hash结构数据存储。

以SpringRedisGenericCache中setBytes方法为例分析RedisTemplate操作redis原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void setBytes(String key, byte[] bytes, long timeToLiveInSeconds) {
if (timeToLiveInSeconds <= 0) {
log.debug(String.format("Invalid timeToLiveInSeconds value : %d , skipped it.", timeToLiveInSeconds));
setBytes(key, bytes);
} else {

redisTemplate.execute((RedisCallback<List<byte[]>>) redis -> {
redis.setEx(_key(key), (int) timeToLiveInSeconds, bytes);
return null;
});
}
}

从源码中可以看到,RedisTemplate的execute方法入参为RedisCallback回调接口,主要用户获取到RedisConnection之后,再执行操作redis的底层代码。RedisTemplate的这种设计,将redis连接的获取与释放对使用者隐藏,redis的具体操作对使用者开放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.RedisOperations#execute(org.springframework.data.redis.core.RedisCallback)
*/
@Override
@Nullable
public <T> T execute(RedisCallback<T> action) {
return execute(action, isExposeConnection());
}
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {

Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
//获取RedisConnectionFactory,这里为LettuceConnectionFactory
RedisConnectionFactory factory = getRequiredConnectionFactory();
//获取RedisConnection,底层就是LettuceConnectionFactory.getConnection()
RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport);

try {

boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = preProcessConnection(conn, existingConnection);

boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}

RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
//回调方法,具体的redis方法由使用者控制,这里将RedisConnection通过参数传给使用者
T result = action.doInRedis(connToExpose);

// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}

return postProcessResult(result, connToUse, existingConnection);
} finally {
//释放RedisConnection
RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
}
}

使用模板设计模式与回调机制实现了操作redis的整个流程:

  • 获取RedisConnectionFactory,测试时,这里为LettuceConnectionFactory
  • 获取RedisConnection,底层就是LettuceConnectionFactory.getConnection()
  • 回调方法,具体的redis方法由使用者控制,这里将RedisConnection通过参数传给使用者
  • 释放RedisConnection

2.1、获取RedisConnection

2.1.1、Spring提供的redis连接工厂

RedisConnectionFactory–> ExceptionTranslatingConnectionProvider–>LettucePoolingConnectionProvider–>StandaloneConnectionProvider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Obtain a connection from the associated {@link LettuceConnectionProvider}.
*
* @return the connection.
*/
private StatefulConnection<E, E> getNativeConnection() {
//connectionProvider为:ExceptionTranslatingConnectionProvider
return connectionProvider.getConnection(StatefulConnection.class);
}
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
try {
//delegate为LettucePoolingConnectionProvider
return delegate.getConnection(connectionType);
} catch (RuntimeException e) {
//异常转换
throw translateException(e);
}
}
private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap<>(32);
private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap<>(32);
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
//GenericObjectPool:org.apache.commons.pool2.impl.GenericObjectPool
GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
//connectionProvider为:StandaloneConnectionProvider
return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
poolConfig, false);
});

try {
StatefulConnection<?, ?> connection = pool.borrowObject();
poolRef.put(connection, pool);
return connectionType.cast(connection);
} catch (Exception e) {
throw new PoolException("Could not get a resource from the pool", e);
}
}
@Override
public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType) {
return getConnectionAsync(connectionType, redisURISupplier.get());
}
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

if (connectionType.equals(StatefulRedisSentinelConnection.class)) {
return connectionType.cast(client.connectSentinel());
}

if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
return connectionType.cast(client.connectPubSub(codec));
}

if (StatefulConnection.class.isAssignableFrom(connectionType)) {
//获取redis连接,这里的client为:io.lettuce.core.RedisClient
return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
.orElseGet(() -> client.connect(codec)));
}

throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
}

2.1.2、Lettuce创建连接

阅读创建Redis连接的代码有以下几个难点:

  • CompletableFuture的使用
  • Lettuce的EventRecorder
  • Reactor的Mono使用
  • Netty的使用

2.1.2.1、入口connect

1
2
3
4
5
6
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {
assertNotNull(redisURI);
//connectStandaloneAsync是异步创建connection,返回的是Future对象,
//通过getConnection转为同步操作
return getConnection(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout()));
}

依次通过connectStandaloneAsync->connectStatefulAsync->initializeChannelAsync->initializeChannelAsync0等方法找到Netty连接Redis源码。

2.1.2.2、connectStandaloneAsync

主要创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI, Duration timeout) {

assertNotNull(codec);
checkValidRedisURI(redisURI);

logger.debug("Trying to get a Redis connection for: {}", redisURI);
//创建一个有状态的EndPoint用于抽象底层channel的实现,
// DefaultEndpoint内部封装断线重连、重连后成功后回放连接失败期间的command。
// 同时封装了AT_MOST_ONCE、AT_LEAST_ONCE的可靠性实现(该逻辑是基于内存的,所以并不可靠)。
DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources());
RedisChannelWriter writer = endpoint;

//装饰器,添加支持过期时间的执行命令
if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
}

//装饰器,添加Redis事件监听
if (CommandListenerWriter.isSupported(getCommandListeners())) {
writer = new CommandListenerWriter(writer, getCommandListeners());
}

// 创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。
// 此时并没有真正的创建物理连接,该类本身是无状态、线程安全的。
//StatefulRedisConnectionImpl的构造函数中已经创建了sync、async、reactive三种类型的RedisCommand。
// 基于RedisCodec对key和value序列化,通过write把命令真正的发出去。
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);

//异步创建Redis物理连接,返回future对象。后面可以看到future中返回的对象其实还是上面的connection
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new CommandHandler(getOptions(), getResources(), endpoint));

//当CompletableFuture的任务不论是正常完成还是出现异常它都会调用whenComplete这回调函数
future.whenComplete((channelHandler, throwable) -> {
//正常完成:whenComplete返回结果和上级任务一致,异常为null;
//出现异常:whenComplete返回结果为null,异常为上级任务的异常;
if (throwable != null) {
//发生异常后,异步关闭连接
connection.closeAsync();
}
});

return future;
}

2.1.2.3、connectStatefulAsync

构建ConnectionBuilder,通过ConnectionBuilder来创建connection。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SuppressWarnings("unchecked")
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint,
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
//构建ConnectionBuilder,通过ConnectionBuilder来创建connection
ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
sslConnectionBuilder.ssl(redisURI);
connectionBuilder = sslConnectionBuilder;
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}

ConnectionState state = connection.getConnectionState();
state.apply(redisURI);
state.setDb(redisURI.getDatabase());

//填充StatefulRedisConnectionImpl
connectionBuilder.connection(connection);
//控制RedisClient行为的一些配置参数
connectionBuilder.clientOptions(getOptions());
//ClientResource包含了一些EventLoopGroup信息
connectionBuilder.clientResources(getResources());
//配置commandHandlerSupplier,这个commandHandler很重要,是实现StatefulRedisConnectionImpl线程安全的关键,后面会详细讲。
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
//connectionBuilder填充Bootstrap、channelGroup等更多的信息
//getSocketAddressSupplier是根据redisURI获取真正的Redis连接信息,如:sentinel模式下,需要从sentinel获取到真实的redis连接地址
//通过Mono.defer创建Mono<SocketAddress>,属于懒汉模式
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI);
//初始化连接准备工作
connectionBuilder.connectionInitializer(createHandshake(state));
//初始化redis连接,返回ChannelFuture对象,真正的异步的去创建物理连接
ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);

//把上一个线程的结果“应用于”下一个线程的计算。相当于结果值的传递。
return future.thenApply(channelHandler -> (S) connection);
}

getSocketAddressSupplier方法中,通过Mono.defer懒汉方式创建 Mono对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Mono<SocketAddress> getSocketAddressSupplier(RedisURI redisURI) {
return getSocketAddress(redisURI).doOnNext(addr -> logger.debug("Resolved SocketAddress {} using {}", addr, redisURI));
}
protected Mono<SocketAddress> getSocketAddress(RedisURI redisURI) {

return Mono.defer(() -> {

if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) {
logger.debug("Connecting to Redis using Sentinels {}, MasterId {}", redisURI.getSentinels(),
redisURI.getSentinelMasterId());
return lookupRedis(redisURI).switchIfEmpty(Mono.error(new RedisConnectionException(
"Cannot provide redisAddress using sentinel for masterId " + redisURI.getSentinelMasterId())));

} else {
return Mono.fromCallable(() -> getResources().socketAddressResolver().resolve((redisURI)));
}
});
}

connectionBuilder方法中创建了Netty客户端的Bootstrap对象,并配置Netty的各种参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
ConnectionEvents connectionEvents, RedisURI redisURI) {
//创建Netty客户端的Bootstrap对象
Bootstrap redisBootstrap = new Bootstrap();
//Bootstrap的一些配置参数,具体可以参考Netty的相关书籍(Netty权威指南)
redisBootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
connectionBuilder.bootstrap(redisBootstrap);
//Netty参数配置
connectionBuilder.apply(redisURI);
//EventLoopGroup
connectionBuilder.configureBootstrap(!LettuceStrings.isEmpty(redisURI.getSocket()), this::getEventLoopGroup);
//ChannelGroup
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents == this.connectionEvents ? connectionEvents
: ConnectionEvents.of(this.connectionEvents, connectionEvents));
//配置socket地址提供者
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}

2.1.2.4、initializeChannelAsync

SocketAddress及Channel对象的异步处理,最终调用真正获取物理连接的方法initializeChannelAsync0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {

//通过Mono.defer创建的Mono<SocketAddress>,属于懒汉模式
Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();

if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}

//创建socketAddressFuture 对象
//当socketAddressSupplier异步获取SocketAddress成功之后会把SocketAddress数据放入该对象中
CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>();
//创建channelReadyFuture,当连接建立成功之后会把Channel对象放入该对象中
CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();

String uriString = connectionBuilder.getRedisURI().toString();

//事件检录,依赖jdk.jfr.Event,好像是JDK11之后才有
EventRecorder.getInstance().record(
new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId()));

EventRecorder.RecordableEvent event = EventRecorder.getInstance()
.start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId()));

//channelReadyFuture完成后执行
channelReadyFuture.whenComplete((channel, throwable) -> {
//完成事件报告
event.record();
});

//配置获取SocketAddress异步操作之后的操作:
//1. 把SocketAddress对象放入socketAddressFuture中
//2. 基于SocketAddress调用initializeChannelAsync0方法真正去建立连接
socketAddressSupplier
//发生错误后执行
.doOnError(socketAddressFuture::completeExceptionally)
//成功后执行
.doOnNext(socketAddressFuture::complete)
//订阅Mono
.subscribe(redisAddress -> {
//如果此CompletableFuture在正常完成之前被取消,则返回true。
if (channelReadyFuture.isCancelled()) {
return;
}
//异步建立真正的连接,如果建立成功会把生产的Channel对象放入channelReadyFuture中
initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);
}, channelReadyFuture::completeExceptionally);
//建立连接成功之后返回的还是connectionBuilder的connection对象,即StatefulRedisConnectionImpl
return new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder.connection()));
}

2.1.2.5、initializeChannelAsync0

操作Netty创建连接,大量使用了CompletableFuture等异步编程方式,代码看着比较吃力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
SocketAddress redisAddress) {

logger.debug("Connecting to Redis at {}", redisAddress);

Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//创建PlainChannelInitializer对象,PlainChannelInitializer对象会在Channel初始化的时候添加很多Handlers(Netty的Handler概念可以参考Netty权威指南),
// 如:CommandEncoder、CommandHandler(非常重要的Handler)、ConnectionWatchdog(实现断线重连)
ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress);
//RedisChannelInitializer配置到Bootstrap中
redisBootstrap.handler(initializer);
//调用一些通过ClientResources自定义的回调函数
clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
//真正的通过Netty异步的方式去建立物理连接,返回ChannelFuture对象
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);


channelReadyFuture.whenComplete((c, t) -> {
//异常处理
if (t instanceof CancellationException) {
//netty取消连接
connectFuture.cancel(true);
}
});

//netty 增加监听
connectFuture.addListener(future -> {

if (!future.isSuccess()) {
logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
//重置
connectionBuilder.endpoint().initialState();
//记录异常
channelReadyFuture.completeExceptionally(future.cause());
return;
}

//必须有RedisHandshakeHandler,使用ConnectionInitializer初始化连接
RedisHandshakeHandler handshakeHandler = connectFuture.channel().pipeline().get(RedisHandshakeHandler.class);
if (handshakeHandler == null) {
//记录异常
channelReadyFuture.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered"));
return;
}

//future to synchronize channel initialization. Returns a new future for every reconnect.
//连接redis完成后执行
handshakeHandler.channelInitialized().whenComplete((success, throwable) -> {
//如果异常为空,则初始化成功。
if (throwable == null) {
logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
//把成功之后的结果赋值给channelReadyFuture对象
channelReadyFuture.complete(connectFuture.channel());
return;
}

logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
//将此端点重置为其初始状态,清除所有缓冲区并可能关闭绑定的通道。
connectionBuilder.endpoint().initialState();
//记录异常
channelReadyFuture.completeExceptionally(throwable);
});
});
}

2.2、redis操作

以插入数据为例,分析下spring通过lettuce操作redis。

1
2
3
4
//获取LettuceConnection
RedisConnection connection = connectionFactory.getConnection();
//set操作
connection.set("key".getBytes(StandardCharsets.UTF_8), "123".getBytes(StandardCharsets.UTF_8));

这里的connection为:org.springframework.data.redis.connection.lettuce.LettuceConnection

1
2
3
4
5
6
7
8
//继承RedisCommands接口
public interface RedisConnection extends RedisCommands, AutoCloseable {//略}
//定义了redis的各种操作
public interface RedisCommands extends RedisKeyCommands, RedisStringCommands, RedisListCommands, RedisSetCommands,
RedisZSetCommands, RedisHashCommands, RedisTxCommands, RedisPubSubCommands, RedisConnectionCommands,
RedisServerCommands, RedisStreamCommands, RedisScriptingCommands, RedisGeoCommands, RedisHyperLogLogCommands { @Nullable
Object execute(String command, byte[]... args);
}

通过debug,依次经过以下方法:

  • org.springframework.data.redis.connection.DefaultedRedisConnection#set(byte[], byte[])
  • org.springframework.data.redis.connection.lettuce.LettuceConnection#stringCommands
  • org.springframework.data.redis.connection.lettuce.LettuceStringCommands#set(byte[], byte[])

2.2.1、Spring提供的LettuceInvoker,根据异步连接操作Lettuce的方法

1
2
3
4
5
6
7
@Override
public Boolean set(byte[] key, byte[] value) {
//略
//这里的connection还是LettuceConnection
return connection.invoke().from(RedisStringAsyncCommands::set, key, value)
.get(Converters.stringToBooleanConverter());
}

2.2.1.1、LettuceConnection#invoke()方法返回的LettuceInvoker对象

用户Lettuce方法的函数调用. 通常用于将方法调用表示为方法引用,并通过just或from方法之一传递方法参数。

ust方法记录方法调用并立即评估方法结果。

from方法允许组成一个pipeline ,使用转换器转换结果。

Usage example:

LettuceInvoker invoker = …;

Long result = invoker.just(RedisGeoAsyncCommands::geoadd, key, point.getX(), point.getY(), member);

List<byte[]> result = invoker.fromMany(RedisGeoAsyncCommands::geohash, key, members).toList(it -> it.getValueOrElse(null));

RedisFuture委托给LettuceInvoker管理。

Synchronizer可以等待完成,也可以记录转换器的future,以便进一步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
LettuceInvoker invoke() {
return invoke(getAsyncConnection());
}

LettuceInvoker invoke(RedisClusterAsyncCommands<byte[], byte[]> connection) {
return doInvoke(connection, false);
}
private LettuceInvoker doInvoke(RedisClusterAsyncCommands<byte[], byte[]> connection, boolean statusCommand) {
//略
return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
try {
Object result = await(future.get());
if (result == null) {
return nullDefault.get();
}
return converter.convert(result);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
});
}
@Nullable
private <T> T await(RedisFuture<T> cmd) {
if (isMulti) {
return null;
}
try {
return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS);
} catch (RuntimeException e) {
throw convertLettuceAccessException(e);
}
}

getAsyncConnection获取异步连接

  • 在分析获取RedisConnection的时候,了解到StatefulRedisConnectionImpl中维护了RedisAsyncCommandsImpl的实现对象。
  • 这里异步获取连接的async属性(RedisAsyncCommandsImpl实现对象)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
if (isQueueing() || isPipelined()) {
return getAsyncDedicatedConnection();
}
if (asyncSharedConn != null) {
if (asyncSharedConn instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).async();
}
if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).async();
}
}
return getAsyncDedicatedConnection();
}

async对象设置与获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public RedisAsyncCommands<K, V> async() {
return async;
}

public StatefulRedisConnectionImpl(RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec,
Duration timeout) {

super(writer, timeout);

this.pushHandler = pushHandler;
this.codec = codec;
this.async = newRedisAsyncCommandsImpl();
this.sync = newRedisSyncCommandsImpl();
this.reactive = newRedisReactiveCommandsImpl();
}

2.2.1.2、LettuceInvoker的from方法

这里阅读源码的难点在于:@FunctionalInterface函数式接口和lambda表达式。

了解两个前提:

  • 创建redis连接的时候,在io.lettuce.core.RedisClient#connectStandaloneAsync方法中调用了newStatefulRedisConnection方法,最终new了一个StatefulRedisConnectionImpl,注意其中的async属性。
  • 上个小节中LettuceInvoker中的connection属性就是这个async,即RedisAsyncCommandsImpl对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public Boolean set(byte[] key, byte[] value) {
//创建function的方式一
LettuceInvoker.ConnectionFunction2<byte[], byte[], String> function = new LettuceInvoker.ConnectionFunction2<byte[], byte[], String>() {
@Override
public RedisFuture<String> apply(RedisClusterAsyncCommands<byte[], byte[]> connection, byte[] key, byte[] value) {
return connection.set(key,value);
}
};
//创建function的方式二
LettuceInvoker.ConnectionFunction2<byte[], byte[], String> function = (connection, key1, value1) -> connection.set(key1, value1);
//创建function的方式三
LettuceInvoker.ConnectionFunction2<byte[], byte[], String> function = RedisStringAsyncCommands::set;
return connection.invoke().from(function, key, value)
.get(Converters.stringToBooleanConverter());
}
@FunctionalInterface
interface ConnectionFunction2<T1, T2, R> {
//RedisClusterAsyncCommands带有两个参数调用方法,返回值为io.lettuce.core.RedisFuture
RedisFuture<R> apply(RedisClusterAsyncCommands<byte[], byte[]> connection, T1 t1, T2 t2);
}
//组合pipeline
<R, T1, T2> SingleInvocationSpec<R> from(ConnectionFunction2<T1, T2, R> function, T1 t1, T2 t2) {
Assert.notNull(function, "ConnectionFunction must not be null");
//创建function2的方式一
ConnectionFunction0<R> function2 = new ConnectionFunction0<R>() {
@Override
public RedisFuture<R> apply(RedisClusterAsyncCommands<byte[], byte[]> connection) {
return function.apply(connection, t1, t2);
}
};
//创建function2的方式二
ConnectionFunction0<R> function2 = connection -> function.apply(connection, t1, t2);
return from(function2);
}

@FunctionalInterface
interface ConnectionFunction0<R> {
//RedisClusterAsyncCommands无参数调用方法,返回值为io.lettuce.core.RedisFuture
RedisFuture<R> apply(RedisClusterAsyncCommands<byte[], byte[]> connection);
}
//组合pipeline
<R> SingleInvocationSpec<R> from(ConnectionFunction0<R> function) {
Assert.notNull(function, "ConnectionFunction must not be null");
//创建parent的方式一
Supplier<RedisFuture<R>> parent = new Supplier<RedisFuture<R>>() {
@Override
public RedisFuture<R> get() {
return function.apply(connection);
}
};
//创建parent的方式二
Supplier<RedisFuture<R>> parent = () -> function.apply(connection);
return new DefaultSingleInvocationSpec<>(parent, synchronizer);
}

从代码中可以看出,Spring的LettuceInvoker对RedisClusterAsyncCommands做了一个编排,组装好DefaultSingleInvocationSpec对象,最后在调用SingleInvocationSpec的get方法。

2.2.1.3、SingleInvocationSpec的get方法

1
2
3
4
5
6
//.get(Converters.stringToBooleanConverter());
@Override
public <T> T get(Converter<S, T> converter) {
Assert.notNull(converter, "Converter must not be null");
return synchronizer.invoke(parent, converter, () -> null);
}
  • converter为Converters.stringToBooleanConverter(),将返回的“OK”字符串转Boolean true
  • parent为RedisFuture

synchronizer为LettuceInvoker.Synchronizer,在2.2.1.1小节能看到:new LettuceInvoker的方法中,传入了synchronizer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
LettuceInvoker.Synchronizer synchronizer = new LettuceInvoker.Synchronizer() {
@Override
public Object doInvoke(Supplier<RedisFuture<Object>> futureSupplier, Converter<Object, Object> converter, Supplier<Object> nullDefault) {
try {
//先执行futureSupplier.get(),之后再执行await
Object result = await(futureSupplier.get());

if (result == null) {
return nullDefault.get();
}

return converter.convert(result);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
};
@Nullable
private <T> T await(RedisFuture<T> cmd) {
if (isMulti) {
return null;
}
try {
return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS);
} catch (RuntimeException e) {
throw convertLettuceAccessException(e);
}
}
  • 通过执行futureSupplier.get(),触发RedisStringAsyncCommands#set(K, V)方法
  • 再执行await后,等待timeout时间,从RedisFuture获取结果

至此终于到了核心执行redis命令的地方。

2.2.2、RedisStringAsyncCommands的set方法执行流程

2.3、释放RedisConnection

3、Redis 发布订阅

  • 通过SpringRedisPubSubPolicy注册对redis中key-val过期和删除监听
  • SpringRedisPubSubPolicy#publish 发布消息
  • SpringRedisMessageListener 订阅消息监听

参考

https://github.com/lettuce-io/lettuce-core/

https://github.com/spring-projects/spring-data-redis

https://www.cnblogs.com/sinoknots/p/15984465.html

https://blog.csdn.net/qq_38390126/article/details/104256912

https://www.jianshu.com/p/9531dd70b394

https://cloud.tencent.com/developer/article/1683498

https://segmentfault.com/a/1190000041391240

https://www.jianshu.com/p/ab80bb3b5415

https://www.jianshu.com/p/52e988455d07

https://www.jianshu.com/p/fff27097bd81

https://blog.csdn.net/winterking3/article/details/116477522

https://blog.csdn.net/winterking3/article/details/116025829?spm=1001.2014.3001.5502

https://blog.csdn.net/singgel/article/details/105552962

https://blog.csdn.net/weixin_42260270/article/details/117749453