> 文章列表 > redis实战---分布式锁--实战篇

redis实战---分布式锁--实战篇

redis实战---分布式锁--实战篇

分布式锁实战

  • 故事背景
  • 问题复现
  • 解决方案
    • 自己手动实现
      • 代码
      • 压测结果
      • 代码重点解释
    • 基于Redisson进行实现
      • 引入依赖
      • 代码使用
      • 运行结果
      • 源码解析
  • 总结&升华

故事背景

上文讲到我们使用synchronized实现了jvm级别的加锁。同时抛出了在分布式环境下,我们的代码会出现的问题。这篇文章,将会带着大家去解决这个问题。带着大家一起实现redis的分布式锁。
redis实战---分布式锁--实战篇

问题复现

  1. 官网上下载nginx
  2. 配置负载均衡。
    redis实战---分布式锁--实战篇
	# 定义 upstream 池upstream backend {server 127.0.0.1:8000;server 127.0.0.1:8001;}location / {proxy_pass http://backend;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;}	

我将我的项目复制了一分,修改了端口号,模拟部署在不同的环境上。以上配置将会以轮训的方式进行服务调用
3. 接口调用接口
redis实战---分布式锁--实战篇
可以看到,两个服务都分别被调用了一次。这时候我们使用的synchronized锁就会失效了,下面我们压测一下,看看结果。
4. 压测,复现问题。
redis实战---分布式锁--实战篇
结果:
redis实战---分布式锁--实战篇
上图中,左边是端口8001的服务,右边是8000的服务,我们发现,这两个服务虽然单独的看,销售的商品都是正确的,但是放在一起看,就会发现有相同的库存,这就说明,同一个库存被卖了两次,我们上文提到的超卖问题仍然存在!!

解决方案

自己手动实现

下面将会讲解如何自己进行手动实现分布式锁,此方式只供大家参考理解,如果项目上用,推荐集成Redisson使用其提供的解决方案。

代码

@RestController
@RequestMapping("/test")
public class IndexController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@GetMapping("lock")public  String deductStock() {//写死一个固定的商品id,作为我们被秒杀的商品String lockKey="lock:product:101";//uuid,防止删除其他人加的锁String clientId = UUID.randomUUID().toString();//进行加锁,设置过期时间为10s 注意代码的原子性Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,clientId,10, TimeUnit.SECONDS);//如果加锁失败,返回错误,秒未成功if(!result){return "error_code";}try {// 获取当前库存String stock1 = stringRedisTemplate.opsForValue().get("stock");if( stock1 == null){System.out.println("秒杀未开始");return "end";}int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {// 扣减库存int realStock = stock - 1;// 更新库存stringRedisTemplate.opsForValue().set("stock", realStock + "");System.out.println("扣减成功,剩余的库存为:" + realStock);} else {System.out.println("扣减失败,库存不足");}}finally {//如果是自己加的锁就自己删掉if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){stringRedisTemplate.delete(lockKey);}}return "end";}}

压测结果

redis实战---分布式锁--实战篇
两个项目,没有超卖问题。一共20个请求,其中6个加锁成功,消耗了库存,其余14个由于没有加锁成功,秒杀失败。

代码重点解释

1.使用setnx
SETNX 是 Redis 数据库中的一个命令,用于将一个键值对(key-value pair)设置到 Redis 中,但只有在该键不存在的情况下才会设置成功。如果该键已经存在,SETNX 命令不会对其进行任何操作,并返回 0,否则返回 1。
2.死锁问题
使用setnx进行加锁的时候,一定要设置锁的过期时间。业务完成之后,一定要及时释放锁,避免产生死锁问题。并且一定要保证加锁和设置锁的过期时间操作是原子的,避免只上锁,未设置过期时间问题的存在
3.锁续命问题
上述代码作为一个简单的分布式锁实现,在并发量不算很高的情况下,不会出现什么问题,但是它实际上还是有瑕疵的。我们上述代码,锁失效有两种可能。一种是过期,另一种是代码删除。代码删除没什么问题,我们选择将锁删除的时候,肯定是业务代码执行完毕。但是如果是过期的话,有可能我们的业务代码还没有执行完,锁先过期了,并发量大的情况下,外部不断有请求试图加锁,可能会造成锁失效的情况。

基于Redisson进行实现

我们可以通过为锁续命来解决上文所述,代码未执行完毕,锁已经过期的问题,这里将使用Redisson的解决方案

引入依赖

<!--        redisson依赖--><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.15.5</version></dependency>

代码使用

@Autowiredprivate RedissonClient redissonClient;@GetMapping("lock1")public  String deductStock1() {//写死一个固定的商品id,作为我们被秒杀的商品String lockKey="lock:product:101";//获取锁对象RLock lock = redissonClient.getLock(lockKey);//加锁,使用lock方法,锁将会自动续命lock.lock();try {// 获取当前库存String stock1 = stringRedisTemplate.opsForValue().get("stock");if( stock1 == null){System.out.println("秒杀未开始");return "end";}int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {// 扣减库存int realStock = stock - 1;// 更新库存stringRedisTemplate.opsForValue().set("stock", realStock + "");System.out.println("扣减成功,剩余的库存为:" + realStock);} else {System.out.println("扣减失败,库存不足");}return "end";}finally {//释放锁lock.unlock();}}

运行结果

我们来压测一下,发现可以实现分布式锁的效果,不会出现超卖问题
redis实战---分布式锁--实战篇

源码解析

Redisson的实现锁的续命,主要的代码在 RedissonLock类的lock方法中,下面我们来解析下它的lock方法

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 获取当前线程IDlong threadId = Thread.currentThread().getId();// 尝试获取锁,获取到了则返回null,否则返回锁的剩余过期时间Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// 如果返回null,说明锁已经被当前线程获取,直接返回if (ttl == null) {return;}// 创建一个订阅对象RFuture<RedissonLockEntry> future = subscribe(threadId);// 如果可中断,则中断等待if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {// 否则一直等待commandExecutor.syncSubscription(future);}try {while (true) {// 再次尝试获取锁,获取到了则返回null,否则返回锁的剩余过期时间ttl = tryAcquire(-1, leaseTime, unit, threadId);// 如果返回null,说明锁已经被当前线程获取,跳出循环if (ttl == null) {break;}// 如果锁剩余过期时间大于等于0,则等待指定时间if (ttl >= 0) {try {// 等待指定时间后再次尝试获取锁future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {// 如果是可中断的,则抛出异常if (interruptibly) {throw e;}// 否则继续等待future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 否则一直等待直到获取到锁if (interruptibly) {future.getNow().getLatch().acquire();} else {future.getNow().getLatch().acquireUninterruptibly();}}}} finally {// 取消订阅unsubscribe(future, threadId);}
}

在这段代码中,首先获取了当前线程的ID,并通过 tryAcquire() 方法尝试获取锁,如果获取成功(即 ttl == null),则直接返回。

如果获取不到锁,会进行如下操作:

  1. 通过 subscribe() 方法,向 Redisson 客户端发送一条消息,表示当前线程正在等待该锁的释放。
  2. 如果 interruptibly 为 true,则使用 syncSubscriptionInterrupted() 方法等待消息;否则,使用 syncSubscription() 方法等待消息。
  3. 进入循环,不断尝试获取锁(即调用 tryAcquire() 方法)。
  4. 如果获取到锁,则根据 ttl 值进行等待:
    如果 ttl 大于等于 0,则等待 ttl 时间,同时等待 Redisson 客户端发送消息,通知当前线程释放锁。
    如果 ttl 小于 0,则说明当前线程已经在 Redisson 客户端的等待队列中,直接等待通知即可。
  5. 当获取到锁时,会解除之前发送的等待消息,然后退出循环。

在上述过程中,由于不断地尝试获取锁,因此每次成功获取锁时都会重置锁的过期时间。这样就可以实现锁的自动续命了。

总结&升华

通过本篇文章,我们了解到了如何实现redis的分布式锁。学习了如何使用Redisson进行分布式锁,并且解决了锁的续命问题。
redis实战