Redis实现分布式锁的正确姿势 | Spring Cloud 36
一、分布式锁
1.1 什么是分布式锁
分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程。
1.2 分布式锁应该具备哪些条件
-
在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
-
高可用的获取锁与释放锁
-
高性能的获取锁与释放锁
-
具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)
-
具备锁失效机制,即自动解锁,防止死锁
-
具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
1.3 分布式锁的实现方式
-
基于数据库实现分布式锁
-
基于
Zookeeper
实现分布式锁 -
基于
Reids
实现分布式锁
本章重点讲解的是基于
Reids
的分布式锁
二、利用RedisTemplate实现分布式锁
在真实的项目里,我们经常会使用Spring Boot/Spring Cloud
框架,该框架已经集成了 RedisTemplate
类,开放了很多对Redis
的API
。
本章节列举基于
RedisTemplate
实现方式分布式锁的各种方式及存在的问题。
2.1 利用setIfAbsent先设置key value再设置expire
redisTemplate.opsForValue().setIfAbsent(key,value);
redisTemplate.expire(key, time, TimeUnit.SECONDS);
问题:「不是原子操作」。以上两条语句不是原子性的。假如执行完第一条语句后,服务挂掉,导致
key
永久存在,锁无法释放。
setIfAbsent(key, value)
方法简介:如果key
不存在则新增,返回true
;存在则不改变已经有的值返回false
。
2.2 利用setIfAbsent同时设置 key value expire
redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS);
问题一:「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行。
问题二:「锁被别的线程误删」。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完。
2.3 利用setIfAbsent同时设置 key value expire,value为客户端标识
可以在获取锁,解锁时。首先获取客户端标识,如果和加锁时不一致,则获解锁操作失败,解决了2.2
中提到了「锁被别的线程误删」
问题。
Callable<String> callable = () -> {String result = "";String threadId = Thread.currentThread().getId() + "";if (Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, threadId, 10, TimeUnit.SECONDS))) {result = threadId;try {// 模拟业务处理Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();} finally {if (redisTemplate.opsForValue().get(key).equals(threadId)) {redisTemplate.delete(key);}}}return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {Future<String> future = executorService.submit(callable);list.add(future);
}
for (Future<String> future : list) {log.info(future.get());
}
问题:「不是原子操作」。通过
key
获取加锁时的客户端标识和释放锁两条语句不是原子操作。
2.4 利用lua脚本进行加锁及释放锁原子操作
本章重点,通过
lua
脚本对2.3
提出的原子性问题进行解决。
RedisTemplate
执行lua
脚本加锁释放锁工具类:
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;@Component
public class ConcurrentLockUtil {private static final String LOCK_LUA = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('expire', KEYS[1], ARGV[2]) return 'true' else return 'false' end";private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]) end return 'true' ";private final RedisScript lockRedisScript;private final RedisScript unLockRedisScript;private final RedisSerializer<String> argsSerializer;private final RedisSerializer<String> resultSerializer;private RedisTemplate redisTemplate;public ConcurrentLockUtil(RedisTemplate redisTemplate) {this.argsSerializer = new StringRedisSerializer();this.resultSerializer = new StringRedisSerializer();this.lockRedisScript = RedisScript.of(LOCK_LUA, String.class);this.unLockRedisScript = RedisScript.of(UNLOCK_LUA, String.class);this.redisTemplate = redisTemplate;}/*** 分布式锁** @param lockKey* @param value* @param time* @return*/public boolean lock(String lockKey, String value, long time) {List<String> keys = Collections.singletonList(lockKey);String flag = (String) redisTemplate.execute(lockRedisScript, argsSerializer, resultSerializer, keys, value, String.valueOf(time));return Boolean.valueOf(flag);}/*** 删除锁** @param lock* @param val*/public void unlock(String lock, String val) {List<String> keys = Collections.singletonList(lock);redisTemplate.execute(unLockRedisScript, argsSerializer, resultSerializer, keys, val);}
}
业务调用:
@Autowired
ConcurrentLockUtil concurrentLockUtil;ExecutorService executorService = Executors.newFixedThreadPool(10);
String key = "test2-lock";Callable<String> callable = () -> {String result = "";String threadId = Thread.currentThread().getId() + "";if (Boolean.TRUE.equals(concurrentLockUtil.lock(key, threadId, 10))) {result = threadId;try {Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();} finally {concurrentLockUtil.unlock(key, threadId);}}return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {Future<String> future = executorService.submit(callable);list.add(future);
}
for (Future<String> future : list) {log.info(future.get());
}
问题:针对
2.2
章节提到的「锁过期释放了,业务还没执行完」问题仍然存在。
有些小伙伴认为,稍微把锁过期时间设置长一些就可以。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放,请见下面章节。
三、利用Redisson框架RLock实现分布式锁
开源框架Redisson
解决了2.1.4
提出的问题。官网地址:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0
我们一起来看下Redisson
底层原理图:
只要线程一加锁成功,就会启动一个watch dog
看门狗,它是一个后台线程,会每隔10
秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key
的生存时间(Redisson
中使用的 Lua
脚本做的检查及设置过期时间操作,本身是原子性的)。因此,解决了「锁过期释放,业务没执行完」
问题。
3.1 加锁逻辑
Redisson
中 Lua
加锁脚本定义及流程如下:
3.2 解锁逻辑
Redisson
中 Lua
解锁脚本定义及流程如下:
3.3 续锁逻辑
可以看到续时方法将 threadId
当作标识符进行续时
知道核心理念就好了, 没必要研究每一行代码
四、利用RedissonRedLock多机实现分布式锁
4.1 单机版锁存在的问题
前面章节的几种方案都只是基于单机版的讨论,还不是很完美。其实Redis
一般都是集群部署的:
如果线程一在Redis
的master
节点上拿到了锁,但是加锁的key
还没同步到slave
节点。恰好这时,master
节点发生故障,一个slave
节点就会升级为master
节点。线程二就可能获取同个key
的锁,但线程一也已经拿到锁了,锁的安全性就没了。
4.2 RedissonRedLock核心思想
为了解决上述问题,Redis
作者 antirez
提出一种高级的分布式锁算法:Redlock
。Redlock
核心思想是这样的:
搞多个
Redis
master
部署,以保证它们不会同时宕掉。并且这些master
节点是完全相互独立的,相互之间不存在数据同步。同时,需要确保在这多个master
实例上,是与在Redis
单实例,使用相同方法来获取和释放锁。
RedissonRedLock
算法:
RedissonRedLock
业务逻辑实现:
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);String lockKey = "myLock";
int waitTimeout = 5;
int leaseTime = 30;/*** 获取多个 RLock 对象*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);/*** 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {/*** 4.尝试获取锁* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean res = redLock.tryLock((long) waitTimeout, (long) leaseTime, TimeUnit.SECONDS);if (res) {//成功获得锁,在这里处理业务}
} catch (Exception e) {throw new RuntimeException("aquire lock fail");
} finally {//无论如何, 最后都要解锁redLock.unlock();
}
RedissonRedLock
核心源码:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1;if (leaseTime != -1) {newLeaseTime = unit.toMillis(waitTime)*2;}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);/*** 1. 允许加锁失败节点个数限制(N-(N/2+1))*/int failedLocksLimit = failedLocksLimit();/*** 2. 遍历所有节点通过EVAL命令执行lua加锁*/List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;/*** 3.对节点尝试加锁*/try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {// 抛出异常表示获取锁失败lockAcquired = false;}if (lockAcquired) {/***4. 如果获取到锁则添加到已获取锁集合中*/acquiredLocks.add(lock);} else {/*** 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功*/if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1 && leaseTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}/*** 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false*/if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}/*** 7.如果逻辑正常执行完则认为最终申请锁成功,返回true*/return true;
}
当然,对于 RedissonRedLock
算法不是没有质疑声, 大家可以去 Redis
官网查看Martin Kleppmann
与 Redis
作者Antirez
的辩论。