> 文章列表 > Redis实现分布式锁的正确姿势 | Spring Cloud 36

Redis实现分布式锁的正确姿势 | Spring Cloud 36

Redis实现分布式锁的正确姿势 | Spring Cloud 36

一、分布式

1.1 什么是分布式锁

分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程。

1.2 分布式锁应该具备哪些条件

  • 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行

  • 高可用的获取锁与释放锁

  • 高性能的获取锁与释放锁

  • 具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)

  • 具备锁失效机制,即自动解锁,防止死锁

  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

1.3 分布式锁的实现方式

  • 基于数据库实现分布式锁

  • 基于Zookeeper实现分布式锁

  • 基于Reids实现分布式锁

本章重点讲解的是基于Reids的分布式锁

二、利用RedisTemplate实现分布式锁

在真实的项目里,我们经常会使用Spring Boot/Spring Cloud框架,该框架已经集成了 RedisTemplate 类,开放了很多对RedisAPI

本章节列举基于RedisTemplate实现方式分布式锁的各种方式及存在的问题。

2.1 利用setIfAbsent先设置key value再设置expire

redisTemplate.opsForValue().setIfAbsent(key,value);
redisTemplate.expire(key, time, TimeUnit.SECONDS);

问题:「不是原子操作」。以上两条语句不是原子性的。假如执行完第一条语句后,服务挂掉,导致key永久存在,锁无法释放。

setIfAbsent(key, value)方法简介:如果key不存在则新增,返回 true;存在则不改变已经有的值返回 false

2.2 利用setIfAbsent同时设置 key value expire

redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS);

问题一:「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行。

问题二:「锁被别的线程误删」。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完。

2.3 利用setIfAbsent同时设置 key value expire,value为客户端标识

可以在获取锁,解锁时。首先获取客户端标识,如果和加锁时不一致,则获解锁操作失败,解决了2.2中提到了「锁被别的线程误删」问题。

Callable<String> callable = () -> {String result = "";String threadId = Thread.currentThread().getId() + "";if (Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, threadId, 10, TimeUnit.SECONDS))) {result = threadId;try {// 模拟业务处理Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();} finally {if (redisTemplate.opsForValue().get(key).equals(threadId)) {redisTemplate.delete(key);}}}return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {Future<String> future = executorService.submit(callable);list.add(future);
}
for (Future<String> future : list) {log.info(future.get());
}

问题:「不是原子操作」。通过key获取加锁时的客户端标识和释放锁两条语句不是原子操作。

2.4 利用lua脚本进行加锁及释放锁原子操作

本章重点,通过lua脚本对2.3提出的原子性问题进行解决。

RedisTemplate执行lua脚本加锁释放锁工具类:

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;@Component
public class ConcurrentLockUtil {private static final String LOCK_LUA = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('expire', KEYS[1], ARGV[2]) return 'true' else return 'false' end";private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]) end return 'true' ";private final RedisScript lockRedisScript;private final RedisScript unLockRedisScript;private final RedisSerializer<String> argsSerializer;private final RedisSerializer<String> resultSerializer;private RedisTemplate redisTemplate;public ConcurrentLockUtil(RedisTemplate redisTemplate) {this.argsSerializer = new StringRedisSerializer();this.resultSerializer = new StringRedisSerializer();this.lockRedisScript = RedisScript.of(LOCK_LUA, String.class);this.unLockRedisScript = RedisScript.of(UNLOCK_LUA, String.class);this.redisTemplate = redisTemplate;}/*** 分布式锁** @param lockKey* @param value* @param time* @return*/public boolean lock(String lockKey, String value, long time) {List<String> keys = Collections.singletonList(lockKey);String flag = (String) redisTemplate.execute(lockRedisScript, argsSerializer, resultSerializer, keys, value, String.valueOf(time));return Boolean.valueOf(flag);}/*** 删除锁** @param lock* @param val*/public void unlock(String lock, String val) {List<String> keys = Collections.singletonList(lock);redisTemplate.execute(unLockRedisScript, argsSerializer, resultSerializer, keys, val);}
}

业务调用:

@Autowired
ConcurrentLockUtil concurrentLockUtil;ExecutorService executorService = Executors.newFixedThreadPool(10);
String key = "test2-lock";Callable<String> callable = () -> {String result = "";String threadId = Thread.currentThread().getId() + "";if (Boolean.TRUE.equals(concurrentLockUtil.lock(key, threadId, 10))) {result = threadId;try {Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();} finally {concurrentLockUtil.unlock(key, threadId);}}return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {Future<String> future = executorService.submit(callable);list.add(future);
}
for (Future<String> future : list) {log.info(future.get());
}

问题:针对2.2章节提到的「锁过期释放了,业务还没执行完」问题仍然存在。

有些小伙伴认为,稍微把锁过期时间设置长一些就可以。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放,请见下面章节。

三、利用Redisson框架RLock实现分布式锁

开源框架Redisson解决了2.1.4提出的问题。官网地址:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0

我们一起来看下Redisson底层原理图:

Redis实现分布式锁的正确姿势 | Spring Cloud 36

只要线程一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key的生存时间(Redisson 中使用的 Lua 脚本做的检查及设置过期时间操作,本身是原子性的)。因此,解决了「锁过期释放,业务没执行完」问题。

3.1 加锁逻辑

RedissonLua 加锁脚本定义及流程如下:

Redis实现分布式锁的正确姿势 | Spring Cloud 36

Redis实现分布式锁的正确姿势 | Spring Cloud 36

3.2 解锁逻辑

RedissonLua 解锁脚本定义及流程如下:

Redis实现分布式锁的正确姿势 | Spring Cloud 36

Redis实现分布式锁的正确姿势 | Spring Cloud 36

3.3 续锁逻辑

Redis实现分布式锁的正确姿势 | Spring Cloud 36

可以看到续时方法将 threadId 当作标识符进行续时

Redis实现分布式锁的正确姿势 | Spring Cloud 36

知道核心理念就好了, 没必要研究每一行代码

Redis实现分布式锁的正确姿势 | Spring Cloud 36

四、利用RedissonRedLock多机实现分布式锁

4.1 单机版锁存在的问题

前面章节的几种方案都只是基于单机版的讨论,还不是很完美。其实Redis一般都是集群部署的:

Redis实现分布式锁的正确姿势 | Spring Cloud 36

如果线程一在Redismaster节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,master节点发生故障,一个slave节点就会升级为master节点。线程二就可能获取同个key的锁,但线程一也已经拿到锁了,锁的安全性就没了。

4.2 RedissonRedLock核心思想

为了解决上述问题,Redis作者 antirez提出一种高级的分布式锁算法:RedlockRedlock核心思想是这样的:

搞多个Redis master部署,以保证它们不会同时宕掉。并且这些master节点是完全相互独立的,相互之间不存在数据同步。同时,需要确保在这多个master实例上,是与在Redis单实例,使用相同方法来获取和释放锁。

Redis实现分布式锁的正确姿势 | Spring Cloud 36
RedissonRedLock算法:
Redis实现分布式锁的正确姿势 | Spring Cloud 36

RedissonRedLock业务逻辑实现:

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);String lockKey = "myLock";
int waitTimeout = 5;
int leaseTime = 30;/*** 获取多个 RLock 对象*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);/*** 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {/*** 4.尝试获取锁* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败* leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean res = redLock.tryLock((long) waitTimeout, (long) leaseTime, TimeUnit.SECONDS);if (res) {//成功获得锁,在这里处理业务}
} catch (Exception e) {throw new RuntimeException("aquire lock fail");
} finally {//无论如何, 最后都要解锁redLock.unlock();
}

RedissonRedLock核心源码:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1;if (leaseTime != -1) {newLeaseTime = unit.toMillis(waitTime)*2;}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);/*** 1. 允许加锁失败节点个数限制(N-(N/2+1))*/int failedLocksLimit = failedLocksLimit();/*** 2. 遍历所有节点通过EVAL命令执行lua加锁*/List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;/***  3.对节点尝试加锁*/try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {// 抛出异常表示获取锁失败lockAcquired = false;}if (lockAcquired) {/***4. 如果获取到锁则添加到已获取锁集合中*/acquiredLocks.add(lock);} else {/*** 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功*/if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1 && leaseTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}/*** 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false*/if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}/*** 7.如果逻辑正常执行完则认为最终申请锁成功,返回true*/return true;
}

当然,对于 RedissonRedLock 算法不是没有质疑声, 大家可以去 Redis 官网查看Martin KleppmannRedis 作者Antirez 的辩论。