> 文章列表 > Redis分布式锁系列

Redis分布式锁系列

Redis分布式锁系列

1.压力测试出的内存泄漏及解决(可跳过)

使用jmeter对查询产品分类列表接口进行压力测试,出现了堆外内存溢出异常。
Redis分布式锁系列
我们设置的虚拟机堆内存100m,并不是堆外内存100m
Redis分布式锁系列
产生堆外内存溢出:OutOfDirectMemoryError
原因是因为:
springboot2.0以后默认使用lettuce作为操作redis的客户端。它使用netty进行网络通信。
lettuce的bug导致netty堆外内存溢出,-Xmx300m, netty如果没有指定堆外内存,默认使用-Xmx300m作为堆外内存。
由于在高并发的时候,获取的数据量非常大,高并发一进来,由于数据在传输期间都要占内存,导致内存分配不足,堆外内存溢出。

解决方案:不能使用-Dio.netty.maxDirectoryMemory只去调大堆外内存(只是延缓了问题的出现)
1)、升级lettuce客户端, 2)、切换使用Jedis
lettuce,jedis操作redis的底层客户端,spring再次封装redisTemplate

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>

2.高并发下缓存失效的问题

2.1.缓存穿透

缓存穿透: 指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是 数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不 存在的数据每次请求都要到存储层去查询,失去了缓存的意义
Redis分布式锁系列

风险: 利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃

解决: null结果缓存,并加入短暂过期时间

2.2.缓存雪崩

缓存雪崩: 缓存雪崩是指在我们设置缓存时key采用了相同的过期时间, 导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时 压力过重雪崩。
Redis分布式锁系列

解决: 原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这 样每一个缓存的过期时间的重复率就会降低,就很难引发集体 失效的事件。

2.3.缓存击穿

缓存穿透:

  • 对于一些设置了过期时间的key,如果这些key可能会在某些 时间点被超高并发地访问,是一种非常“热点”的数据。
  • 如果这个key在大量请求同时进来前正好失效,那么所有对 这个key的数据查询都落到db,我们称为缓存击穿。
    Redis分布式锁系列

解决: 加锁大量并发只让一个去查,其他人等待,查到以后释放锁,其他 人获取到锁,先查缓存,就会有数据,不用去db

 * 1.空结果缓存,解决缓存穿透* 2.设置过期时间(加随机值),解决缓存雪崩* 3.加锁,解决缓存击穿

前两个都好解决,对于加锁的问题,如果锁没加好,又会有新的问题

直接加synchronized锁

// 只要是同一把锁,就能锁住需要这个锁的所有线程
// 1.synchronized(this):,springboot所有组件(这里是CategoryServiceImpl)在容器中都是单例的
// TODO 本地锁,syncrhonized, JUC(Lock) 分布式情况下,想要锁住所有,必须使用分布式锁
synchronized (this) {return getDataFromDb();
}

得到锁以后,应该再去缓存中确定一次,如果没有才需要继续查询。

String catelogJSON = stringRedisTemplate.opsForValue().get("catelogJSON");
if (!StringUtils.isEmpty(catelogJSON)) {// 如果缓存不为空,直接返回Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});return result;
}

分布式下如何加锁
Redis分布式锁系列
TODO 本地锁,syncrhonized, JUC(Lock) 分布式情况下,想要锁住所有,必须使用分布式锁

注意锁的时序问题
Redis分布式锁系列
要保证查数据库和把结果放入缓存是原子操作,是在同一把锁内进行的,否则就会导致释放锁的时序问题,导致多次查询数据库。
开启多个服务的时候只能锁住本地服务
Redis分布式锁系列

3.分布式锁的原理与使用

分布式锁演进-基本原理

Redis分布式锁系列
我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。 “占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。 等待可以自旋的方式。

占坑在redis中通常使用set命令,set的介绍如下:
https://www.cnblogs.com/zhouj850/p/10949359.html
SET key value [EX seconds] [PX milliseconds] [NX|XX]
设置k-v值
EX过期时间,PX毫秒,NX指的是Not Exist,不存在才给里面放。
所以使用SETNX命令可以进行占坑操作。

分布式锁演进-阶段一

Redis分布式锁系列
代码如下:
Redis分布式锁系列
setnx占好了位,业务代码异常或者程序在页面过程 中宕机。没有执行删除锁逻辑,这就造成了死锁
放在finally里面,也不行,机器宕机也会导致锁无法释放。
解决:设置锁的自动过期,即使没有删除,会自动删除

分布式锁演进-阶段二

Redis分布式锁系列

代码如下:
Redis分布式锁系列
问题: 1、setnx设置好,正要去设置过期时间,宕机。又死锁了。
解决: 设置过期时间和占位必须是原子的。redis支持使用setnx ex 命令

redis设置锁的过期时间的命令:
set lock 1111 EX 1000 NX

查询锁的过期时间命令:
ttl lock

设置过期时间,必须和加锁是同步的、原子的

分布式锁演进-阶段三

Redis分布式锁系列
代码如下:
Redis分布式锁系列
问题:
1、删除锁直接删除??? 如果由于业务时间很长,锁自己过期了,我们 直接删除,有可能把别人正在持有的锁删除了
解决: 占锁的时候,值指定为uuid,每个人匹配是自己 的锁才删除。

Redis分布式锁系列

问题:
1、如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁

获取值对比+对比成功删除锁,一定要是原子操作。

解决: 删除锁必须保证原子性。使用redis+Lua脚本完成

String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

Redis分布式锁系列
保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。

所有代码如下:

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {// 1.占分布式锁,去redis占坑String uuid = UUID.randomUUID().toString();Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("Lock", uuid, 300, TimeUnit.SECONDS);Map<String, List<Catelog2Vo>> dataFromDb;if (lock) {System.out.println("获取分布式锁成功...");try {// 加锁成功。。。执行业务// 设置过期时间,必须和加锁是同步的、原子的//stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);dataFromDb = getDataFromDb();} finally {// 获取值对比+对比成功删除锁,一定要是原子操作。
//            String lockValue = stringRedisTemplate.opsForValue().get("lock");
//            if (uuid.equals(lockValue)) {
//                // 删除自己的锁
//                stringRedisTemplate.delete("lock");
//            }String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";// 删除锁Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);}return dataFromDb;} else {// 加锁失败。。。重试synchronized(), 重试自旋// 休眠100ms重试System.out.println("获取分布锁失败,,,");try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return getCatalogJsonFromDbWithRedisLock(); // 自旋的方式}}

更难的事情,锁的自动续期

Redis简介-整合

分布式场景下,还需要用到读写锁、信号量等机制,使用上述方案只能实现简单的分布式锁,因此我们需要借助Redissson框架。完成我们所有的分布式功能。

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
参考文档:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0

        <!--以后使用redisson作为所有分布式锁,分布式对象功能框架--><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.12.0</version></dependency>

将redisson作为分布式锁等功能框架

  •  1)、引入依赖
    
  •  2)、配置redisson
    

Redisson的配置

@Configuration
public class MyRedissonConfig {/* 所有对redissson的使用都是通过RedissonClient对象* @Bean给容器中放一个组件对象* /@Bean(destroyMethod = "shutdown")public RedissonClient redisson() throws IOException {// 创建配置(单节点配置)Config config = new Config();config.useSingleServer().setAddress("redis://192.168.56.10:6379");// 根据config创建出redissonClient示例RedissonClient redissonClient = Redisson.create(config);return redissonClient;}}

Redisson-lock锁测试