- SpringRedisGenericCache key-val 结构数据存储,支持key过期时间控制。
- SpringRedisCache hash结构数据存储。
以SpringRedisGenericCache中setBytes方法为例分析RedisTemplate操作redis原理。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void setBytes(String key, byte[] bytes, long timeToLiveInSeconds) { if (timeToLiveInSeconds <= 0) { log.debug(String.format("Invalid timeToLiveInSeconds value : %d , skipped it.", timeToLiveInSeconds)); setBytes(key, bytes); } else { redisTemplate.execute((RedisCallback<List<byte[]>>) redis -> { redis.setEx(_key(key), (int) timeToLiveInSeconds, bytes); return null; }); } }
|
从源码中可以看到,RedisTemplate的execute方法入参为RedisCallback回调接口,主要用户获取到RedisConnection之后,再执行操作redis的底层代码。RedisTemplate的这种设计,将redis连接的获取与释放对使用者隐藏,redis的具体操作对使用者开放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
@Override @Nullable public <T> T execute(RedisCallback<T> action) { return execute(action, isExposeConnection()); } @Nullable public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport); try { boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if (pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); T result = action.doInRedis(connToExpose); if (pipeline && !pipelineStatus) { connToUse.closePipeline(); } return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport); } }
|
使用模板设计模式与回调机制实现了操作redis的整个流程:
- 获取RedisConnectionFactory,测试时,这里为LettuceConnectionFactory
- 获取RedisConnection,底层就是LettuceConnectionFactory.getConnection()
- 回调方法,具体的redis方法由使用者控制,这里将RedisConnection通过参数传给使用者
- 释放RedisConnection
2.1、获取RedisConnection
2.1.1、Spring提供的redis连接工厂
RedisConnectionFactory–> ExceptionTranslatingConnectionProvider–>LettucePoolingConnectionProvider–>StandaloneConnectionProvider
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
private StatefulConnection<E, E> getNativeConnection() { return connectionProvider.getConnection(StatefulConnection.class); } @Override public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) { try { return delegate.getConnection(connectionType); } catch (RuntimeException e) { throw translateException(e); } } private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap<>(32); private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap<>(32); @Override public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) { GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> { return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType), poolConfig, false); }); try { StatefulConnection<?, ?> connection = pool.borrowObject(); poolRef.put(connection, pool); return connectionType.cast(connection); } catch (Exception e) { throw new PoolException("Could not get a resource from the pool", e); } } @Override public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType) { return getConnectionAsync(connectionType, redisURISupplier.get()); } @Override public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) { if (connectionType.equals(StatefulRedisSentinelConnection.class)) { return connectionType.cast(client.connectSentinel()); } if (connectionType.equals(StatefulRedisPubSubConnection.class)) { return connectionType.cast(client.connectPubSub(codec)); } if (StatefulConnection.class.isAssignableFrom(connectionType)) { return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it)) .orElseGet(() -> client.connect(codec))); } throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!"); }
|
2.1.2、Lettuce创建连接
阅读创建Redis连接的代码有以下几个难点:
- CompletableFuture的使用
- Lettuce的EventRecorder
- Reactor的Mono使用
- Netty的使用
2.1.2.1、入口connect
1 2 3 4 5 6
| public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) { assertNotNull(redisURI); return getConnection(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout())); }
|
依次通过connectStandaloneAsync->connectStatefulAsync->initializeChannelAsync->initializeChannelAsync0等方法找到Netty连接Redis源码。
2.1.2.2、connectStandaloneAsync
主要创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { assertNotNull(codec); checkValidRedisURI(redisURI); logger.debug("Trying to get a Redis connection for: {}", redisURI); DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources()); RedisChannelWriter writer = endpoint; if (CommandExpiryWriter.isSupported(getOptions())) { writer = new CommandExpiryWriter(writer, getOptions(), getResources()); } if (CommandListenerWriter.isSupported(getCommandListeners())) { writer = new CommandListenerWriter(writer, getCommandListeners()); } StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout); ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler(getOptions(), getResources(), endpoint)); future.whenComplete((channelHandler, throwable) -> { if (throwable != null) { connection.closeAsync(); } }); return future; }
|
2.1.2.3、connectStatefulAsync
构建ConnectionBuilder,通过ConnectionBuilder来创建connection。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @SuppressWarnings("unchecked") private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) { ConnectionBuilder connectionBuilder; if (redisURI.isSsl()) { SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder(); sslConnectionBuilder.ssl(redisURI); connectionBuilder = sslConnectionBuilder; } else { connectionBuilder = ConnectionBuilder.connectionBuilder(); } ConnectionState state = connection.getConnectionState(); state.apply(redisURI); state.setDb(redisURI.getDatabase()); connectionBuilder.connection(connection); connectionBuilder.clientOptions(getOptions()); connectionBuilder.clientResources(getResources()); connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint); connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI); connectionBuilder.connectionInitializer(createHandshake(state)); ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder); return future.thenApply(channelHandler -> (S) connection); }
|
getSocketAddressSupplier方法中,通过Mono.defer懒汉方式创建 Mono对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private Mono<SocketAddress> getSocketAddressSupplier(RedisURI redisURI) { return getSocketAddress(redisURI).doOnNext(addr -> logger.debug("Resolved SocketAddress {} using {}", addr, redisURI)); } protected Mono<SocketAddress> getSocketAddress(RedisURI redisURI) { return Mono.defer(() -> { if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) { logger.debug("Connecting to Redis using Sentinels {}, MasterId {}", redisURI.getSentinels(), redisURI.getSentinelMasterId()); return lookupRedis(redisURI).switchIfEmpty(Mono.error(new RedisConnectionException( "Cannot provide redisAddress using sentinel for masterId " + redisURI.getSentinelMasterId()))); } else { return Mono.fromCallable(() -> getResources().socketAddressResolver().resolve((redisURI))); } }); }
|
connectionBuilder方法中创建了Netty客户端的Bootstrap对象,并配置Netty的各种参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder, ConnectionEvents connectionEvents, RedisURI redisURI) { Bootstrap redisBootstrap = new Bootstrap(); redisBootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); connectionBuilder.bootstrap(redisBootstrap); connectionBuilder.apply(redisURI); connectionBuilder.configureBootstrap(!LettuceStrings.isEmpty(redisURI.getSocket()), this::getEventLoopGroup); connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents == this.connectionEvents ? connectionEvents : ConnectionEvents.of(this.connectionEvents, connectionEvents)); connectionBuilder.socketAddressSupplier(socketAddressSupplier); }
|
2.1.2.4、initializeChannelAsync
SocketAddress及Channel对象的异步处理,最终调用真正获取物理连接的方法initializeChannelAsync0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync( ConnectionBuilder connectionBuilder) { Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress(); if (clientResources.eventExecutorGroup().isShuttingDown()) { throw new IllegalStateException("Cannot connect, Event executor group is terminated."); } CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>(); CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>(); String uriString = connectionBuilder.getRedisURI().toString(); EventRecorder.getInstance().record( new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId())); EventRecorder.RecordableEvent event = EventRecorder.getInstance() .start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId())); channelReadyFuture.whenComplete((channel, throwable) -> { event.record(); }); socketAddressSupplier .doOnError(socketAddressFuture::completeExceptionally) .doOnNext(socketAddressFuture::complete) .subscribe(redisAddress -> { if (channelReadyFuture.isCancelled()) { return; } initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress); }, channelReadyFuture::completeExceptionally); return new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder.connection())); }
|
2.1.2.5、initializeChannelAsync0
操作Netty创建连接,大量使用了CompletableFuture等异步编程方式,代码看着比较吃力。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture, SocketAddress redisAddress) { logger.debug("Connecting to Redis at {}", redisAddress); Bootstrap redisBootstrap = connectionBuilder.bootstrap(); ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress); redisBootstrap.handler(initializer); clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap); ChannelFuture connectFuture = redisBootstrap.connect(redisAddress); channelReadyFuture.whenComplete((c, t) -> { if (t instanceof CancellationException) { connectFuture.cancel(true); } }); connectFuture.addListener(future -> { if (!future.isSuccess()) { logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause()); connectionBuilder.endpoint().initialState(); channelReadyFuture.completeExceptionally(future.cause()); return; } RedisHandshakeHandler handshakeHandler = connectFuture.channel().pipeline().get(RedisHandshakeHandler.class); if (handshakeHandler == null) { channelReadyFuture.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered")); return; } handshakeHandler.channelInitialized().whenComplete((success, throwable) -> { if (throwable == null) { logger.debug("Connecting to Redis at {}: Success", redisAddress); RedisChannelHandler<?, ?> connection = connectionBuilder.connection(); connection.registerCloseables(closeableResources, connection); channelReadyFuture.complete(connectFuture.channel()); return; } logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable); connectionBuilder.endpoint().initialState(); channelReadyFuture.completeExceptionally(throwable); }); }); }
|