1、从一个问题开始

问题:假设设置失效时间10秒,如果由于某些原因导致10秒还没执行完任务,这时候锁自动失效,导致其他线程也会拿到分布式锁,怎么处理?
答:Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。

2、分布式锁的要求

  • 互斥性
    任意时刻只能有一个客户端拥有锁,不能同时多个客户端获取
  • 安全性
    锁只能被持有该锁的用户删除,而不能被其他用户删除
  • 死锁
    获取锁的客户端因为某些原因而宕机,而未能释放锁,其他客户端无法获取此锁,需要有机制来避免该类问题的发生
  • 容错
    当部分节点宕机,客户端仍能获取锁或者释放锁

3、Redisson实现的分布式锁

具体使用可以参考Redisson官方文档
这里贴上我简单使用的例子:

3.1、pom

我使用的是springboot,所以直接用了redisson提供的集成包。

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.4</version>
</dependency>

3.2、application.properties

我用的redis是官方cluster的3主3从。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# common spring boot settings

#spring.redis.database=
#spring.redis.host=
#spring.redis.port=
spring.redis.password=XXXXXXXXXX
#spring.redis.ssl=
#spring.redis.timeout=
spring.redis.cluster.nodes=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#spring.redis.sentinel.master=
#spring.redis.sentinel.nodes=

# Redisson settings

#path to redisson.yaml or redisson.json
#spring.redis.redisson.config=classpath:redisson.yaml

3.3、test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 public void contextLoads() {
//简单使用测试
// RBucket<String> bucket = redisson.getBucket("bucket");
// bucket.set("test");
// String obj = bucket.get();
// System.out.println(obj);

// 获得锁对象实例
RLock lock = redisson.getLock("lock");

// 获取分布式锁,采用默认超时时间30秒
// 如果负责储存这个分布式锁的Redisson节点宕机以后,
// 而且这个锁正好处于锁住的状态时,
// 这个锁会出现锁死的状态。
// 为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,
// 它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
// 默认情况下,看门狗的检查锁的超时时间是30秒钟,
// 也可以通过修改Config.lockWatchdogTimeout来另行指定
lock.lock();
try {
Thread.sleep(80000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
// 这种指定了超时时间的锁不会走看门狗逻辑,
// 即会发生任务没有执行完成时,锁超时了,其他进程会获取到这个分布式锁。
// 尽量使用第一种方式,走看门狗逻辑。
// lock.lock(40, TimeUnit.SECONDS);
// try {
// Thread.sleep(80000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}

这里再看一看具体获取锁和释放锁的核心逻辑:

3.3.1、获取锁

首先,调用了RedissonLock中的Lock方法:

1
2
3
4
5
6
7
8
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}

注意这里第一个入参为-1。
进入lock方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//获取锁逻辑
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
//获取成功,返回
return;
}

//订阅锁
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);

try {
//持续获取锁
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}

// waiting for message
if (ttl >= 0) {
try {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
//最终取消订阅获取锁
unsubscribe(future, threadId);
}
}

再看tryAcquire方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//获取锁
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//看门狗逻辑
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}

因为leaseTime为-1,所以首先异步的获取锁,之后会走看门狗逻辑。
先看获取锁的操作:tryLockInnerAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

Redisson 使用 EVAL 命令执行上面的 Lua 脚本来完成获取锁的操作:

  • 如果通过 exists 命令发现当前 key 不存在,即锁没被占用,则执行 hset 写入 Hash 类型数据 key:全局锁名称(例如共享资源ID), field:锁实例名称(Redisson客户端ID:线程ID), value:1,并执行 pexpire 对该 key 设置失效时间,返回空值 nil,至此获取锁成功。
  • 如果通过 hexists 命令发现 Redis 中已经存在当前 key 和 field 的 Hash 数据,说明当前线程之前已经获取到锁,因为这里的锁是可重入的,则执行 hincrby 对当前 key field 的值加一,并重新设置失效时间,返回空值,至此重入获取锁成功。
  • 最后是锁已被占用的情况,即当前 key 已经存在,但是 Hash 中的 Field 与当前值不同,则执行 pttl 获取锁的剩余存活时间并返回,至此获取锁失败。

3.3.2、释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  @Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);

future.onComplete((opStatus, e) -> {
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}

if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}

cancelExpirationRenewal(threadId);
result.trySuccess(null);
});

return result;
}

上面opStatus为null时,会抛出异常,必须由加锁的线程释放锁。
再来看核心方法:unlockInnerAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

依然使用 EVAL 命令执行 Lua 脚本来释放锁:

  • key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1。
  • key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil。
  • 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一。
  • 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1。
  • 上面执行结果返回 nil 的情况,因为自己不是锁的持有者,不允许释放别人的锁,故抛出异常。
  • 执行结果返回 1 的情况,该锁的所有实例都已全部释放,所以不需要再刷新锁的失效时间。

上面的代码解析文本源自:Redisson 分布式锁实现分析(一)

3.4、RedLock算法

Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。
Redisson中也实现了这种算法,具体可以参考看8.4章节
这里简单描述一下这种算法:
假设有5个互不连接的Redis集群

  • 获取当前时间,单位毫秒
  • 依次尝试从5个集群中获取相同的锁。当获取锁的时候,客户端设置一个网络连接和超时时间,
    这个超时时间应该小于锁的失效时间,如果服务端没有在规定的时间内响应,则尝试另一个redis集群。
  • 客户端使用当前时间减去开始获取锁的时间(第一步记录的时间),得到获取锁使用的时间。
    当且仅当一半以上(这里为3)的集群获取到锁,并且使用的时间小于锁失效时间时,才算获取锁成功。
  • 获取到锁之后,设置key真正有效的时间等于有效时间减去获取锁花费的时间。
  • 如果获取锁失败了,客户端应在所有redis集群上进行解锁。

贴一段Redisson的小例子:

1
2
3
4
5
6
7
8
9
10
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();

RedissonRedLock继承自RedissonMultiLock,具体源码就不再继续分析了。

参考

《Redis官方文档》用Redis构建分布式锁
8. 分布式锁和同步器
Redisson 分布式锁实现分析(一)
Redis 分布式锁的前世今生
Redlock:Redis分布式锁最牛逼的实现

tencent.jpg