Redis用于全局ID生成器、分布式锁的解决方案
全局ID生成器
每个店铺都可以发布优惠卷
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增id就存在一些问题:
1.id的规律性太明显
2.受单表数据量的限制
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息
ID的组成部分
1.符号位:1bit,永远为0;
2.时间戳:31bit,以秒为单位,可以使用69年
3.序列号:32bit,秒内的计数器,支持每秒产生2⋀32个不同的ID
RedisIdWorker
@Component
public class RedisIdWorker {/* 开始时间戳*/private static final long Begin_TIMESTAMP = 1640995200L;/* 序列号的位数*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {//1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - Begin_TIMESTAMP;//2.生成序列号//2.1获取当前日期。精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));//2.2自增长long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);//3.拼接并返回return timestamp << COUNT_BITS | count;}
}
private ExecutorService es = Executors.newFixedThreadPool(500);@Resourceprivate RedisIdWorker redisIdWorker;@Testvoid testIdWorker() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300);Runnable task = () -> {for (int i = 0; i < 100; i++) {long id = redisIdWorker.nextId("order");System.out.println("id=" + id);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {es.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("time" + (end - begin));}
全局唯一ID生成策略
1.UUID
2.Redis自增
3.snowflake算法
4.数据库自增
Redis自增ID策略
1.每天一个key,方便统计订单量
2.ID构造是时间戳+计数器
实现优惠券秒杀下单
每个店铺都可以发布优惠券,分为平价劵。平价券可以任意购买,而特价券需要秒杀抢购
表关系如下:
tb_voucher:优惠券的基本信息,优惠金额,使用规则等
tb_seckill_vouvher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
实现优惠券秒杀下单
在VoucherController实现了一个接口,可以实现添加秒杀优惠券、
@RestController
@RequestMapping("/voucher")
public class VoucherController {@Resourceprivate IVoucherService voucherService;/* 新增普通券* @param voucher 优惠券信息* @return 优惠券id*/@PostMappingpublic Result addVoucher(@RequestBody Voucher voucher) {voucherService.save(voucher);return Result.ok(voucher.getId());}/* 新增秒杀券* @param voucher 优惠券信息,包含秒杀信息* @return 优惠券id*/@PostMapping("seckill")public Result addSeckillVoucher(@RequestBody Voucher voucher) {voucherService.addSeckillVoucher(voucher);return Result.ok(voucher.getId());}
}
用户可以在这些店铺页面中抢购这些优惠券
下单时需要判断两点
1.秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
2.库存是否充足,不足则无法下单
超卖问题
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁
乐观锁
乐观锁的关键是判断之前查询得到的数据是否被修改过,常见的方式有两种
超卖总结
超卖这样的线程安全问题,解决方案有哪些
1.悲观锁:添加同步锁,让线程串行执行
-
优点:简单粗暴
-
缺点:性能一般
2.乐观锁:不加锁,在更新时判断是否有其他线程在修改
-
优点:性能好
-
存在成功率低的问题
一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
VoucherOrderServiceImpl
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀尚未开始");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {//秒杀已经结束return Result.fail("秒杀已经结束");}//4.判断库存是否充足if (voucher.getStock() < 1) {//库存不足return Result.fail("库存不足");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {//获取代理对象IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {//5.一人一单Long userId = UserHolder.getUser().getId();//5.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//5.2判断是否存在if (count>0) {//用户已经购买过了return Result.fail("用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId).gt("stock",0).update();if (!success) {//扣减失败return Result.fail("库存不足");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2用户idLong userid = UserHolder.getUser().getId();voucherOrder.setUserId(userid);//7.3代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单idreturn Result.ok(orderId);}
}
一人一单的并发安全问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了
1.我们将服务启动两份,端口分别是8081和8082:
2.然后修改nginx的conf目录下的nginx.cong文件,配置反向代理和负载均衡:
现在用户节点会在这两个节点上负载均衡,再次测试下是否存在线程安全问题
分布式锁
什么是分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
分布式锁的实现
分布式锁的核心是实现多进程之间互斥,而满足这一节点的方式有很多种,常见的有三种:
基于Redis的分布式锁
实现分布式锁需要实现的两个基本方法:
1.获取锁:
-
互斥:确保只能有一个线程获取锁
-
非阻塞:尝试一次,成功返回true,失败返回false
2.释放锁
- 手动释放
基于Redis实现分布式锁的初级版本
ILock
SimpleRedisLock
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;private static final String KEY_PREFIX = "lock:";public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {long threadId = Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}
}
基于Redis实现分布式锁的初级版本存在问题分析
1,业务阻塞超时或业务未执行完释放其他线程的锁
2.释放锁之前判断是否是该线程的锁
改进Redis的分布式锁
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标识(可以用UUID表示)
2.在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致
-
如果一致则释放锁
-
如果不一致不释放锁
SimpleRedisLock
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";@Overridepublic boolean tryLock(long timeoutSec) {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//获取锁中的标识String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//判断标识是否一致if (threadId.equals(id)) {//释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
改进Redis的分布式锁后存在问题分析
锁判断和锁释放不是原子性
Redis的Lua脚本
Redist提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:
https://www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis提供的调用函数,语法如下:
例如我们要执行set name jack,则脚本是这样:
例如,我们要先执行set name Rose,再执行get name,则脚本如下
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
例如,我们要执行redis.call(‘set’,‘name’,‘jack’)这个脚本,语法如下:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
释放锁的业务流程是这样的:
1.获取锁中的线程标示
2.判断是否与指定的标示(当前线标示)一致
3.如果一致则释放锁(删除)
4.如果不一致则什么都不做
利用Lua脚本来表示则是这样的:
再次改进Redis的分布式锁
需求:基于Lua脚本实现分布式锁的释放逻辑
提示:RedisTemplate调用Lua脚本的API如下:
unlock.lua
-- 比较线程标示与锁中的标示是否一致
if (redis.call('get', 'KEYS[1]') == 'ARGV[1]') then-- 释放锁 del keyreturn redis.call('del', KEYS[1])
end
return 0
SimpleRedisLock
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}
}
基于Redis实现的分布式锁总结
1.基于Redis的分布式锁实现思路:
-
利用set nx ex获取锁,并设置过期时间,保存线程标示
-
释放锁时先判断线程标示是否与自己一致,一致则删除锁
2.特性:
-
利用set nx满足互斥性
-
利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
-
利用Redis集群保证高可用和高并发性
基于Redis的分布式锁优化
基于setnx实现的分布式锁存在下面问题:
Redisson
Redisson是一个在Redis的基础上实现java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
官网地址:https://redisson.org
GitHub地址:https://github.com/redisson/redisson
Redisson入门
1.引入依赖
2.配置Redisson客户端
3.使用Redisson的分布式锁
Redisson可重入锁原理
获取锁的Lua脚本
释放锁的Lua脚本
Redisson分布式锁原理
1.可重入:利用hash结构记录线程id和重入次数
2.可重试:利用信号量和PubSub和功能实现等待、唤醒、获取锁失败的重试机制
3.超时续约:利用watchDog,每隔一段时间,重置超时时间
Redisson分布式锁主从一致性问题
分布式锁总结
1.不可重入Redis分布式锁:
-
原理:利用setnx的互斥性,利用ex避免死锁,释放锁时,判断线程标示
-
缺陷:不可重入、无法重试,锁超时失效
2.可重入的Redis分布式锁
-
原理:利用hash结构,记录线程和重入次数,利用watchDog延续时间,利用信号量控制锁重试等待
-
缺陷:redis宕机引起锁失效问题
3.Redisson的multiLock
-
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
-
缺陷:运维成本高,失效复杂
视频地址