若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1);
if (counter > 0) then
– 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
redis.call(‘pexpire’, KEYS[1], ARGV[2]);
return 0;
else
– 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
redis.call(‘del’, KEYS[1]);
redis.call(‘publish’, KEYS[2], ARGV[1]);
return 1;
end;

return nil;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

![1623730580929-4345f89f-0fae-4be2-a82e-ed9197b8cb04.png](https://cdn.chucz.asia/blog/3703820-20251013003740771-810740276.png)

<font style="color:rgb(77, 77, 77);">广播解锁消息有什么用? </font>**<font style="color:rgb(77, 77, 77);">是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。</font>**

<font style="color:rgb(77, 77, 77);">返回值0、1、nil有什么不一样? </font>**<font style="color:rgb(77, 77, 77);">当且仅当返回1,才表示当前请求真正触发了解锁Lua脚本</font>**



### 加锁&解锁流程串起来
![1623736844073-f4981c0d-2b96-41ca-b73f-469c5fbe3c0a.png](https://cdn.chucz.asia/blog/3703820-20251013003740979-674631236.png)



<font style="color:#F5222D;">1、线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。</font>

<font style="color:#F5222D;"></font>

<font style="color:#F5222D;">2、线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;</font>

<font style="color:#F5222D;"></font>

<font style="color:#F5222D;">3、解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁</font>



### 订阅频道名称(如:redisson_lock__channel:{my_first_lock_name})为什么有大括号? 

1. 在redis集群方案中,如果Lua脚本涉及多个key的操作,则需限制这些key在同一个slot中,才能保障Lua脚本执行的原子性。否则运行会报错Lua script attempted to access a non local key in a cluster node . channel; 


2. HashTag是用{}包裹key的一个子串,若设置了HashTag,集群会根据HashTag决定key分配到哪个slot;HashTag不支持嵌套,只有第一个左括号{和第一个右括号}里面的内容才当做HashTag参与slot计算;通常,客户端都会封装这个计算逻辑。



// 见org.redisson.cluster.ClusterConnectionManager#calcSlot




```java
@Override
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}

int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
  1. 在解锁Lua脚本中,操作了两个key:一个是锁名my_lock_name,一个是解锁消息发布订阅频道redisson_lock__channel:{my_first_lock_name},按照上面slot计算方式,两个key都会按照内容my_first_lock_name来计算,故能保证落到同一个slot

redisson代码几乎都是以Lua脚本方式与redis服务端交互,如何跟踪这些脚本执行过程?

启动一个redis客户端终端,执行monitor命令以便在终端上实时打印 redis 服务器接收到的命令;然后debug执行redisson加锁/解锁测试用例,即可看到代码运行过程中实际执行了哪些Lua脚本

eg:上面整体流程示意图的测试用例位:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
@RunWith(SpringRunner.class)  
@SpringBootTest
public class RedissonDistributedLockerTest {
private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);

@Resource
private DistributedLocker distributedLocker;

private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();

private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();

@Test
public void tryLockUnlockCost() throws Exception {
StopWatch stopWatch = new StopWatch("加锁解锁耗时统计");
stopWatch.start();
for (int i = 0; i < 10000; i++) {
String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
Assert.assertTrue(optLocked.isPresent());
optLocked.get().unlock();
}
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}

@Test
public void tryLock() throws Exception {
String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
Assert.assertTrue(optLocked.isPresent());

Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);
Assert.assertTrue(optLocked2.isPresent());

optLocked.get().unlock();
}

/**
* 模拟2个线程争抢锁:A先获取到锁,A释放锁后,B再获得锁
*/
@Test
public void tryLock2() throws Exception {
String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
CountDownLatch countDownLatch = new CountDownLatch(1);
Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {
countDownLatch.await();
log.info("B尝试获得锁:thread={}", currentThreadId());
return distributedLocker.tryLock(key, 600000, 600000);
}
);

log.info("A尝试获得锁:thread={}", currentThreadId());
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);
Assert.assertTrue(optLocked.isPresent());

log.info("A已获得锁:thread={}", currentThreadId());
countDownLatch.countDown();

optLocked.get().unlock();
log.info("A已释放锁:thread={}", currentThreadId());

Optional<LockResource> lockResource2 = submit.get();
Assert.assertTrue(lockResource2.isPresent());

executorServiceB.submit(() -> {
log.info("B已获得锁:thread={}", currentThreadId());
lockResource2.get().unlock();
log.info("B已释放锁:thread={}", currentThreadId());
});
}

/**
* 模拟3个线程争抢锁:A先获取到锁,A释放锁后,B和C同时争抢锁
*/
@Test
public void tryLock3() throws Exception {
String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");

log.info("A尝试获得锁:thread={}", currentThreadId());
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);

if (optLocked.isPresent()) {
log.info("A已获得锁:thread={}", currentThreadId());
}
Assert.assertTrue(optLocked.isPresent());

CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {
cyclicBarrier.await();
log.info("B尝试获得锁:thread={}", currentThreadId());
return distributedLocker.tryLock(key, 600000, 600000);
}
);

Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {
cyclicBarrier.await();
log.info("C尝试获得锁:thread={}", currentThreadId());
return distributedLocker.tryLock(key, 600000, 600000);
}
);

optLocked.get().unlock();
log.info("A已释放锁:thread={}", currentThreadId());

CountDownLatch countDownLatch = new CountDownLatch(2);
executorServiceB.submit(() -> {
log.info("B已获得锁:thread={}", currentThreadId());
try {
submitB.get().get().unlock();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
log.info("B已释放锁:thread={}", currentThreadId());
countDownLatch.countDown();
});

executorServiceC.submit(() -> {
log.info("C已获得锁:thread={}", currentThreadId());
try {
submitC.get().get().unlock();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
log.info("C已释放锁:thread={}", currentThreadId());
countDownLatch.countDown();
});

countDownLatch.await();
}

private static Long currentThreadId() {
return Thread.currentThread().getId();
}

@Test
public void tryLockWaitTimeout() throws Exception {
String key = "mock-key:" + UUID.randomUUID().toString();

Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);
Assert.assertTrue(optLocked.isPresent());

Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {
long now = System.currentTimeMillis();
Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);
long cost = System.currentTimeMillis() - now;
log.info("cost={}", cost);
return optLockedAgain;
}).exceptionally(th -> {
log.error("Exception: ", th);
return Optional.empty();
}).join();

Assert.assertTrue(!optLockResource.isPresent());
}

@Test
public void tryLockWithLeaseTime() throws Exception {
String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);
Assert.assertTrue(optLocked.isPresent());

// 可重入
Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);
Assert.assertTrue(optLockedAgain.isPresent());
}

/**
* 模拟1000个并发请求枪一把锁
*/
@Test
public void tryLockWithLeaseTimeOnMultiThread() throws Exception {
int totalThread = 1000;
String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
AtomicInteger acquiredLockTimes = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(totalThread);
for (int i = 0; i < totalThread; i++) {
executor.submit(new Runnable() {

@Override
public void run() {
tryAcquireLockTimes.getAndIncrement();
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);
if (optLocked.isPresent()) {
acquiredLockTimes.getAndIncrement();
}
}
});
}
executor.awaitTermination(15, TimeUnit.SECONDS);

Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
Assert.assertTrue(acquiredLockTimes.get() == 1);
}

@Test
public void tryLockWithLeaseTimeOnMultiThread2() throws Exception {
int totalThread = 100;
String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
AtomicInteger acquiredLockTimes = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(totalThread);
for (int i = 0; i < totalThread; i++) {
executor.submit(new Runnable() {

@Override
public void run() {
long now = System.currentTimeMillis();
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);
long cost = System.currentTimeMillis() - now;
log.info("tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);
if (optLocked.isPresent()) {
acquiredLockTimes.getAndIncrement();
// 主动释放锁
optLocked.get().unlock();
}
}
});
}
executor.awaitTermination(20, TimeUnit.SECONDS);

log.info("tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());
Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
Assert.assertTrue(acquiredLockTimes.get() == totalThread);
}
}

public interface DistributedLocker {
Optional<LockResource> tryLock(String lockKey, int waitTime);

Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);
public interface LockResource {
void unlock();
}
}

执行的Lua脚本如下:

加锁:redissonClient.getLock(“my_first_lock_name”).tryLock(600000, 600000); 
解锁:redissonClient.getLock(“my_first_lock_name”).unlock();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 线程A
## 1.1.1尝试获取锁 -> 成功
1568357723.205362 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357723.205452 [0 lua] "exists" "my_first_lock_name"
1568357723.208858 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "1"
1568357723.208874 [0 lua] "pexpire" "my_first_lock_name" "600000"


# 线程B
### 2.1.1尝试获取锁,未获取到,返回锁剩余过期时间
1568357773.338018 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338161 [0 lua] "exists" "my_first_lock_name"
1568357773.338177 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338197 [0 lua] "pttl" "my_first_lock_name"


## 2.1.1.3 添加订阅(非Lua脚本) -> 订阅成功
1568357799.403341 [0 127.0.0.1:56421] "SUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"


## 2.1.1.4 再次尝试获取锁 -> 未获取到,返回锁剩余过期时间
1568357830.683631 [0 127.0.0.1:56418] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684371 [0 lua] "exists" "my_first_lock_name"
1568357830.684428 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684485 [0 lua] "pttl" "my_first_lock_name"


# 线程A
## 3.1.1 释放锁并广播解锁消息,0代表解锁消息
1568357922.122454 [0 127.0.0.1:56420] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123645 [0 lua] "exists" "my_first_lock_name"
1568357922.123701 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123741 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "-1"
1568357922.123775 [0 lua] "del" "my_first_lock_name"
1568357922.123799 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"


# 线程B
## 监听到解锁消息消息 -> 释放信号量,阻塞被解除;4.1.1.1 再次尝试获取锁 -> 获取成功
1568357975.015206 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357975.015579 [0 lua] "exists" "my_first_lock_name"
1568357975.015633 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "1"
1568357975.015721 [0 lua] "pexpire" "my_first_lock_name" "600000"

## 4.1.1.3 取消订阅(非Lua脚本)
1568358031.185226 [0 127.0.0.1:56421] "UNSUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"


# 线程B
## 5.1.1 释放锁并广播解锁消息
1568358255.551896 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552125 [0 lua] "exists" "my_first_lock_name"
1568358255.552156 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552200 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "-1"
1568358255.552258 [0 lua] "del" "my_first_lock_name"
1568358255.552304 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"

需要特别注意的是,RedissonLock 同样没**有解决 节点挂掉的时候,存在丢失锁的风险的问题**。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法的 RedissonRedLock,RedissonRedLock 真正解决了单点失败的问题,代价是需要额外的为 RedissonRedLock 搭建Redis环境。

所以,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock。

redlock算法

在分布式版本的算法里我们**假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调机制。**

把N设成5,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位。
  2. 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个尝试从某个Reids实例获取锁的最大等待时间(超过这个时间,则立马询问下一个实例),这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。
  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁消耗的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的总耗时小于锁失效时间时,锁才算获取成功。
  4. 如果取到了锁,key的真正有效时间 = 有效时间(获取锁时设置的key的自动超时时间) - 获取锁的总耗时(询问各个Redis实例的总耗时之和)(步骤3计算的结果)。
  5. 如果因为某些原因,最终获取锁失败(即没有在至少 “N/2+1 ”个Redis实例取到锁或者“获取锁的总耗时”超过了“有效时间”),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,这样可以防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

用 Redisson 实现分布式锁(红锁 RedissonRedLock)及源码分析(实现三)

这里以三个单机模式为例,需要特别注意的是他们完全互相独立,不存在主从复制或者其他集群协调机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);

Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);

/**
* 获取多个 RLock 对象
*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);

/**
* 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

try {
/**
* 4.尝试获取锁
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
if (res) {
//成功获得锁,在这里处理业务
}
} catch (Exception e) {
throw new RuntimeException("aquire lock fail");
}finally{
//无论如何, 最后都要解锁
redLock.unlock();
}

最核心的变化就是需要构建多个 RLock ,然后根据多个 RLock 构建成一个 RedissonRedLock,因为 redLock 算法是建立在多个互相独立的 Redis 环境之上的(为了区分可以叫为 Redission node),Redission node 节点既**可以是单机模式(single),也可以是主从模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。**这就意味着,不能跟以往这样只搭建 1个 cluster、或 1个 sentinel 集群,或是1套主从架构就了事了,需要为 RedissonRedLock 额外搭建多几套独立的 Redission 节点。 比如可以搭建3个 或者5个 Redission节点,具体可看视资源及业务情况而定。

下图是一个利用多个 Redission node 最终 组成 RedLock分布式锁的例子,需要特别注意的是每个 Redission node 是互相独立的,不存在任何复制或者其他隐含的分布式协调机制。

1623736561620-f58b10de-da4d-416d-89d1-ed02cd1154f0.png

Redisson 实现redlock算法源码分析(RedLock)

加锁核心代码

org.redisson.RedissonMultiLock#tryLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}

long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
/**
* 1. 允许加锁失败节点个数限制(N-(N/2+1))
*/
int failedLocksLimit = failedLocksLimit();
/**
* 2. 遍历所有节点通过EVAL命令执行lua加锁
*/
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
/**
* 3.对节点尝试加锁
*/
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}

if (lockAcquired) {
/**
*4. 如果获取到锁则添加到已获取锁集合中
*/
acquiredLocks.add(lock);
} else {
/**
* 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
*/
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}

if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}

/**
* 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
*/
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}

if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}

for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}

/**
* 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
*/
return true;
}

[

](https://blog.csdn.net/w372426096/article/details/103761286)

jedis和spring-boot-starter-data-redis(lettuce)的性能测试比较

https://blog.csdn.net/houpeibin2012/article/details/105839651

https://blog.csdn.net/qq_40925189/article/details/109580439

redission文档

https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

分布式锁

https://blog.csdn.net/qq_33363618/article/details/88783766

分布式锁比较

https://www.developers.pub/article/289

https://www.rongsoft.com/article/2020/12/1209462687/