1、从一个问题开始 问题:假设设置失效时间10秒,如果由于某些原因导致10秒还没执行完任务,这时候锁自动失效,导致其他线程也会拿到分布式锁,怎么处理? 答:Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
2、分布式锁的要求
互斥性 任意时刻只能有一个客户端拥有锁,不能同时多个客户端获取
安全性 锁只能被持有该锁的用户删除,而不能被其他用户删除
死锁 获取锁的客户端因为某些原因而宕机,而未能释放锁,其他客户端无法获取此锁,需要有机制来避免该类问题的发生
容错 当部分节点宕机,客户端仍能获取锁或者释放锁
3、Redisson实现的分布式锁 具体使用可以参考Redisson官方文档 这里贴上我简单使用的例子:
3.1、pom 我使用的是springboot,所以直接用了redisson提供的集成包。
1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson-spring-boot-starter</artifactId > <version > 3.11.4</version > </dependency >
3.2、application.properties 我用的redis是官方cluster的3主3从。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 spring.redis.password =XXXXXXXXXX spring.redis.cluster.nodes =xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
3.3、test 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public void contextLoads () { RLock lock = redisson.getLock("lock" ); lock.lock(); try { Thread.sleep(80000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
这里再看一看具体获取锁和释放锁的核心逻辑:
3.3.1、获取锁 首先,调用了RedissonLock中的Lock方法:
1 2 3 4 5 6 7 8 @Override public void lock () { try { lock(-1 , null , false ); } catch (InterruptedException e) { throw new IllegalStateException (); } }
注意这里第一个入参为-1。 进入lock方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private void lock (long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { return ; } RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true ) { ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { break ; } if (ttl >= 0 ) { try { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { getEntry(threadId).getLatch().acquire(); } else { getEntry(threadId).getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } }
再看tryAcquire方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private Long tryAcquire (long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync (long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1 ) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null ) { return ; } if (ttlRemaining == null ) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
因为leaseTime为-1,所以首先异步的获取锁,之后会走看门狗逻辑。 先看获取锁的操作:tryLockInnerAsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <T> RFuture<T> tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
Redisson 使用 EVAL 命令执行上面的 Lua 脚本来完成获取锁的操作:
如果通过 exists 命令发现当前 key 不存在,即锁没被占用,则执行 hset 写入 Hash 类型数据 key:全局锁名称(例如共享资源ID), field:锁实例名称(Redisson客户端ID:线程ID), value:1,并执行 pexpire 对该 key 设置失效时间,返回空值 nil,至此获取锁成功。
如果通过 hexists 命令发现 Redis 中已经存在当前 key 和 field 的 Hash 数据,说明当前线程之前已经获取到锁,因为这里的锁是可重入的,则执行 hincrby 对当前 key field 的值加一,并重新设置失效时间,返回空值,至此重入获取锁成功。
最后是锁已被占用的情况,即当前 key 已经存在,但是 Hash 中的 Field 与当前值不同,则执行 pttl 获取锁的剩余存活时间并返回,至此获取锁失败。
3.3.2、释放锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public void unlock () { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync (long threadId) { RPromise<Void> result = new RedissonPromise <Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { if (e != null ) { cancelExpirationRenewal(threadId); result.tryFailure(e); return ; } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return ; } cancelExpirationRenewal(threadId); result.trySuccess(null ); }); return result; }
上面opStatus为null时,会抛出异常,必须由加锁的线程释放锁。 再来看核心方法:unlockInnerAsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected RFuture<Boolean> unlockInnerAsync (long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
依然使用 EVAL 命令执行 Lua 脚本来释放锁:
key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1。
key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil。
因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一。
释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1。
上面执行结果返回 nil 的情况,因为自己不是锁的持有者,不允许释放别人的锁,故抛出异常。
执行结果返回 1 的情况,该锁的所有实例都已全部释放,所以不需要再刷新锁的失效时间。
上面的代码解析文本源自:Redisson 分布式锁实现分析(一)
3.4、RedLock算法 Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。 Redisson中也实现了这种算法,具体可以参考看8.4章节 这里简单描述一下这种算法: 假设有5个互不连接的Redis集群
获取当前时间,单位毫秒
依次尝试从5个集群中获取相同的锁。当获取锁的时候,客户端设置一个网络连接和超时时间, 这个超时时间应该小于锁的失效时间,如果服务端没有在规定的时间内响应,则尝试另一个redis集群。
客户端使用当前时间减去开始获取锁的时间(第一步记录的时间),得到获取锁使用的时间。 当且仅当一半以上(这里为3)的集群获取到锁,并且使用的时间小于锁失效时间时,才算获取锁成功。
获取到锁之后,设置key真正有效的时间等于有效时间减去获取锁花费的时间。
如果获取锁失败了,客户端应在所有redis集群上进行解锁。
贴一段Redisson的小例子:
1 2 3 4 5 6 7 8 9 10 RLock lock1 = redissonInstance1.getLock("lock1" );RLock lock2 = redissonInstance2.getLock("lock2" );RLock lock3 = redissonInstance3.getLock("lock3" );RedissonRedLock lock = new RedissonRedLock (lock1, lock2, lock3);lock.lock(); ... lock.unlock();
RedissonRedLock继承自RedissonMultiLock,具体源码就不再继续分析了。
参考 《Redis官方文档》用Redis构建分布式锁 8. 分布式锁和同步器 Redisson 分布式锁实现分析(一) Redis 分布式锁的前世今生 Redlock:Redis分布式锁最牛逼的实现