1、设计目标 分布式部署的应用集群中保证数据更新的互斥性,且程序出现异常时,锁能够自动释放,避免死锁发生。
2、为什么要使用分布式锁 为了保证分布式部署的应用集群中同一时间只有一个客户端对共享资源进行操作。根据锁的用途再细分:
对共享资源的操作是幂等性的,使用分布式锁能够避免重复操作,从而提高效率。
对共享资源的操作是非幂等的,比如订单状态的修改,如果多个客户端同时操作,最后的结果可能很遭,使用分布式锁可以让各个客户端分散时间操作。
3、分布式锁应具备哪些条件 这也是分布式锁的关键技术。
互斥性 这个是分布式锁的基本要求,分布式锁需要保证不同客户端的不同线程之间互斥。
可重入性 支持锁的重入,减少资源消耗。
锁超时释放 获取锁的客户端因为某些原因而宕机,而未能释放锁,其他客户端无法获取此锁,锁超时释放是为了避免死锁。
安全性 锁只能被持有该锁的用户删除,而不能被其他用户删除。
高效与高可用 加锁与解锁需要高效,并保证高可用,当部分节点宕机,客户端仍能获取锁或者释放锁。
支持阻塞与非阻塞 阻塞就是线程获取不到锁一直阻塞,增加超时时间可以防止一直阻塞。非阻塞则获取不到锁不阻塞线程。
支持公平与非公平 公平锁就是按照加锁的顺序获取到锁,非公平锁即无序。
4、基于Redis 4.1、 从一个问题开始 问题:假设设置失效时间10秒,如果由于某些原因导致10秒还没执行完任务,这时候锁自动失效,导致其他线程也会拿到分布式锁,怎么处理? 答:Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
4.2、 Redisson实现的分布式锁 具体使用可以参考Redisson官方文档 这里贴上我简单使用的例子:
4.2.1、例子 4.2.1.1、 pom 我使用的是springboot,所以直接用了redisson提供的集成包。
1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson-spring-boot-starter</artifactId > <version > 3.11.4</version > </dependency >
4.2.1.2、application.properties 我用的redis是官方cluster的3主3从。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 spring.redis.password =XXXXXXXXXX spring.redis.cluster.nodes =xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
4.2.1.3、 test 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public void contextLoads () { RLock lock = redisson.getLock("lock" ); lock.lock(); try { Thread.sleep(80000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
4.2.2、解析 这里再看一看具体获取锁和释放锁的核心逻辑:
4.2.2.1、 获取锁 首先,调用了RedissonLock中的Lock方法:
1 2 3 4 5 6 7 8 @Override public void lock () { try { lock(-1 , null , false ); } catch (InterruptedException e) { throw new IllegalStateException (); } }
注意这里第一个入参为-1。 进入lock方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private void lock (long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { return ; } RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true ) { ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { break ; } if (ttl >= 0 ) { try { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { getEntry(threadId).getLatch().acquire(); } else { getEntry(threadId).getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } }
再看tryAcquire方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private Long tryAcquire (long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync (long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1 ) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null ) { return ; } if (ttlRemaining == null ) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
因为leaseTime为-1,所以首先异步的获取锁,之后会走看门狗逻辑。 先看获取锁的操作:tryLockInnerAsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <T> RFuture<T> tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
Redisson 使用 EVAL 命令执行上面的 Lua 脚本来完成获取锁的操作:
如果通过 exists 命令发现当前 key 不存在,即锁没被占用,则执行 hset 写入 Hash 类型数据 key:全局锁名称(例如共享资源ID), field:锁实例名称(Redisson客户端ID:线程ID), value:1,并执行 pexpire 对该 key 设置失效时间,返回空值 nil,至此获取锁成功。
如果通过 hexists 命令发现 Redis 中已经存在当前 key 和 field 的 Hash 数据,说明当前线程之前已经获取到锁,因为这里的锁是可重入的,则执行 hincrby 对当前 key field 的值加一,并重新设置失效时间,返回空值,至此重入获取锁成功。
最后是锁已被占用的情况,即当前 key 已经存在,但是 Hash 中的 Field 与当前值不同,则执行 pttl 获取锁的剩余存活时间并返回,至此获取锁失败。
4.2.2.2、 释放锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public void unlock () { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync (long threadId) { RPromise<Void> result = new RedissonPromise <Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { if (e != null ) { cancelExpirationRenewal(threadId); result.tryFailure(e); return ; } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return ; } cancelExpirationRenewal(threadId); result.trySuccess(null ); }); return result; }
上面opStatus为null时,会抛出异常,必须由加锁的线程释放锁。 再来看核心方法:unlockInnerAsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected RFuture<Boolean> unlockInnerAsync (long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
依然使用 EVAL 命令执行 Lua 脚本来释放锁:
key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1。
key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil。
因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一。
释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1。
上面执行结果返回 nil 的情况,因为自己不是锁的持有者,不允许释放别人的锁,故抛出异常。
执行结果返回 1 的情况,该锁的所有实例都已全部释放,所以不需要再刷新锁的失效时间。
上面的代码解析文本源自:Redisson 分布式锁实现分析(一)
4.3、RedLock算法 Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。 Redisson中也实现了这种算法,具体可以参考看8.4章节 这里简单描述一下这种算法: 假设有5个互不连接的Redis集群
获取当前时间,单位毫秒
依次尝试从5个集群中获取相同的锁。当获取锁的时候,客户端设置一个网络连接和超时时间, 这个超时时间应该小于锁的失效时间,如果服务端没有在规定的时间内响应,则尝试另一个redis集群。
客户端使用当前时间减去开始获取锁的时间(第一步记录的时间),得到获取锁使用的时间。 当且仅当一半以上(这里为3)的集群获取到锁,并且使用的时间小于锁失效时间时,才算获取锁成功。
获取到锁之后,设置key真正有效的时间等于有效时间减去获取锁花费的时间。
如果获取锁失败了,客户端应在所有redis集群上进行解锁。
贴一段Redisson的小例子:
1 2 3 4 5 6 7 8 9 10 RLock lock1 = redissonInstance1.getLock("lock1" );RLock lock2 = redissonInstance2.getLock("lock2" );RLock lock3 = redissonInstance3.getLock("lock3" );RedissonRedLock lock = new RedissonRedLock (lock1, lock2, lock3);lock.lock(); ... lock.unlock();
RedissonRedLock继承自RedissonMultiLock,具体源码就不再继续分析了。
5、基于Zookeeper 5.1、节点类型 Zookeeper是一个一致性的文件系统,保证了其每个节点的唯一性。 有4种节点类型:
持久化目录节点 客户端与Zookeeper断开后,该节点依旧存在。
持久化顺序编号目录节点 保持持久化目录节点的特性外,每个节点的名称会被顺序编号。
临时目录节点 客户端与Zookeeper断开后,该节点被删除。
临时顺序标号目录节点 保持临时目录节点的特性外,每个节点的名称会被顺序编号。
顺序号是单调递增的计数器,由父节点维护。
5.2、分布式锁原理 分布式锁就是利用了Zookeeper的临时顺序标号目录节点的原理来实现。Locks主节点下面的ID最小的节点获得锁的权限,其他客户端来获取锁时,发现自己不是最靠前的,会监视他的前一个锁节点,当锁释放时,相应的节点被删除,会通知这个等待的客户端,让其获取锁的权利,相当于形成了一个等待队列。
Zookeeper分布式锁的优点就是有现成的框架可以拿来就用,因为有等待队列,枪锁的效率也会高。缺点是因为Zookeeper是类似文件系统的数据结构,所以删除和新增节点的效率会比较低。
参考 《Redis官方文档》用Redis构建分布式锁 8. 分布式锁和同步器 Redisson 分布式锁实现分析(一) Redis 分布式锁的前世今生 Redlock:Redis分布式锁最牛逼的实现