> 文章列表 > 【面试 分布式锁详细解析】续命 自旋锁 看门狗 重入锁,加锁 续命 解锁 核心源码,lua脚本解析,具体代码和lua脚本如何实现

【面试 分布式锁详细解析】续命 自旋锁 看门狗 重入锁,加锁 续命 解锁 核心源码,lua脚本解析,具体代码和lua脚本如何实现

【面试 分布式锁详细解析】续命 自旋锁 看门狗 重入锁,加锁 续命 解锁 核心源码,lua脚本解析,具体代码和lua脚本如何实现

Redisson实现分布式锁原理

自己实现锁续命

  • 在 controller 里开一个 线程 (可以为 守护线程)
    • 每10秒,判断一个 这个 UUID是否存在,如果 存在,重置为 30秒。
    • 如果不存在,守护线程 也结束。

基本的key value

atguiguLock 锁的名字,是redis的 keyhasHkey为:
82a218d0-27c8-4028-a8ca-dfa514da61c7:71 #UUID:线程IDvalue为:1。如果再次调用lock,会变成2

基本的使用

  • setIfAbsent存在不设置
        RLock lock = redisson.getLock(REDIS_LOCK);lock.lock(30, TimeUnit.SECONDS);//如果是被锁定的if (lock.isLocked()) {//如果是 当前线程持有的话if (lock.isHeldByCurrentThread()) {//才进行解锁lock.unlock();}}//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中String uuid = UUID.randomUUID().toString();//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!// 访问skuId 为25号的商品 100008348542String locKey = "lock:" + "25"; // 锁住的是每个商品的数据// 3 获取锁Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);//锁的key,锁的值uuid,锁3秒//如果这个锁存在,就不设置

16384

Redis 集群没有使用一致性hash, 而是引入了哈希槽的概念。

Redis Cluster 采用虚拟哈希槽分区,所有的键根据哈希函数映射到 0 ~ 16383 整数槽内,每个key通过CRC16校验后对16384取模来决定放置哪个槽(Slot),每一个节点负责维护一部分槽以及槽所映射的键值数据。

计算公式:slot = CRC16(key) & 16383。

这种结构很容易添加或者删除节点,并且无论是添加删除或者修改某一个节点,都不会造成集群不可用的状态。使用哈希槽的好处就在于可以方便的添加或移除节点。

  • 当需要增加节点时,只需要把其他节点的某些哈希槽挪到新节点就可以了;
  • 当需要移除节点时,只需要把移除节点上的哈希槽挪到其他节点就行了。

大神的回答

1. 定位 Master 的节点:16384 取模

  • 这里应该是 通过 myLock 决定 存到 哪个地方
  1. 客户端线程,在 底层是 如何实现 加锁

第一步,先定位 Master 的节点,

通过key,就是 redisson.getLock(“myLock”)的字符串参数,

  • myLock 计算出 循环冗余校验码的值,
  • 再用 该 循环冗余 校验码对 16384 取模,得到 hash slot
  • 通过这个 hash solt,定位redis-cluster的集群当中的master
    的节点

2. 加锁:UUID:ThreadID设置为1

第二步:加锁

  • 加锁底层逻辑是通过Lua脚本来实现的,
  • 如果客户端线程第一次去加锁的话,会在key对应的hash数据结构当中,添加线程标识,UUID:ThreadID 1
  • 指定该线程当前对这个key加锁一次了,并设置锁的过期时间为30秒

客户段线程是如何维持加锁的

  • 当加锁成功以后,此时会对加锁的结果设置一个监听器
  • 如果监听到加锁成功了,也就是返回的结果为null
  • 就会在后台通过watchdog开门狗机制,启动一个后台定时任务
    • 每隔10秒执行一次检查,如果当前key依然存在,就重置key的存活时间为30秒
    • 维持加锁,底层就是通过后台这样一个,线程定时刷新存活时间维持的

如何实现可重入锁:UUID:ThreadID 的值 +1

相同的客户端线程是如何实现可重入加锁的

  • 第一次加锁时,会往key对应的hash数据结构当中,设置UUID:ThreadID 1,表示当前线程对key加锁一次
  • 如果相同的线程来再次对这个key加锁,只需要将UUID:ThreadlD持有锁的次数加1即可,就为UUID:ThreadID 2了
  • redisson底层就是通过这样的数据结构,来表示重入加锁的语义的

其他线程加锁失败时:自旋锁

  • 底层是如何实现阻塞的,通过key对应的hash结构当中的UUID:threadid判断是否为当前线程ID
  • 如果不是,则线程加锁失败,如果没有设置锁取所超时时间,
  • 此时就会进入一个while的死循环中,一直尝试加锁直到加锁成功才会返回

宕机锁是如何释放的:没看门狗了

  • 相应的watchdog后台定时任务,当然已经没了
  • 此时就无法对key进行定时续期,那么当指定存活时间过后,
  • key就会自动失效,锁当然也就自动释放了

如何主动释放持有的锁:lua脚本,扣重入次数

  • 底层同样也是通过执行Lua脚本的方式,
  • 如果判断当前释放的key存在,并且在key的hash结构当中
    • 存在当前线程的加锁信息,那么此时就会减扣当前线程对这个key的重入锁次数
    • 减扣线程的重入锁次数之后,如果当前线程在这个key的重入次数为0,此时就会直接释放锁,
    • 如果当前线程,在这个key中重入锁次数依然大于0,此时就直接重置一下,Key的续期时间为30秒
      • 然后 value - 1 (减去 1)

锁超时的机制:不指定超时 一直获取

7:客户端尝试获取锁超时时间的机制,底层是如何实现的

  • 如果在加锁时就指定呢,尝试获取锁超时时间,
  • 如果获取所失败,此时就不会永无止境,的在while循环里面一直等待
  • 而是根据你指定的锁超时时间,在这段时间范围内获取不到锁,那么就会标记为获取所失败
  • 直接返回 false

锁超时自动释放:指定超时的没看门狗

8客户端锁超时自动释放机制在底层又是如何实现的

  • 如果在加锁时指定的锁超时时间,那么就算你获取所成功了
  • 也不会开启watch dog 他定时任务了,
  • 此时直接 就将当前持有的这把锁的过期时间,设置为你指定的超时时间
  • 那么当你指定的时间到了之后,Key失效被删除了,key对应的锁相应的也就自动释放了
slot
英
/slɒt/
n.
(可投入东西的)狭长孔,狭槽;
v.
把……投入窄孔中,把……放到指定位置;为……安排时间,安置;

* 我的整理

加锁核心方法

lock 和 tryAcquireAsync

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);}
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {}}
acquire
英
/əˈkwaɪə(r)/
v.
获得,得到;学到,习得;患上(疾病);逐渐具有,开始学会

最终调用tryLockInnerAsync

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {//过期时间为 30000毫秒this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if下面的 lua脚本", Collections.singletonList(this.getName()), this.internalLockLeaseTime, //30000毫秒this.getLockName(threadId));}//锁名字:就是 ID:线程名。id默认是uuidprotected String getLockName(long threadId) {return this.id + ":" + threadId;}

看门狗的超时

    //初始化 时间为30秒。3万毫秒public Config() {this.transportMode = TransportMode.NIO;this.lockWatchdogTimeout = 30000L;}

加锁的 lua 脚本

基本命令学习

PEXPIRE key milliseconds 设置key的有效时间以毫秒为单位
#pexpireexists num11 //不存在,返回为0
(integer) 0PTTL key  获取key的有效毫秒数
#pttl

基本结构

key为:atguiguLock
hashKey为:a4fafbec-ad68-4ee1-9420-ed22e547083f:71
hash值为:1

核心脚本 hincrby 加锁

// 如果 keys[1](lockKey) == 0,说明不存在,执行这些逻辑。// keys[1] 不存在,使用hincrby命令,设置redis的key为:lockKey。
// 设置 hash的key为:UUID:线程名ID,hash的值为:1//早起的版本为 hset设置。新版为:hincrby
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); //调用 过期时间为 参数130万毫秒,30秒redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end; //如果 这个 hash 存在(即:hexists 返回1if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then //使用 hincrby,增加1redis.call('hincrby', KEYS[1], ARGV[2], 1); //重置过期时间为 30秒redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end; return redis.call('pttl', KEYS[1]);

续命的方法

onComplete 和 scheduleExpirationRenewal

  • RFuture执行完毕后 回调
  • 嵌套调用 TimerTask 是 netty包下的类,
    • 续命完成后调用 onComplete,方法里 又调用了自己。
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining == null) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;
    private void scheduleExpirationRenewal(long threadId) {RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();RedissonLock.ExpirationEntry oldEntry =(RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);//续命逻辑在这里this.renewExpiration();}}

续命外层方法renewExpiration

private void renewExpiration() {RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee != null) {Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent != null) {Long threadId = ent.getFirstThreadId();if (threadId != null) {RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {if (res) {//锁续命完毕,执行 onComplete 方法,又调用 他自己。RedissonLock.this.renewExpiration();}}});}}}//延迟执行的参数,为 超时时间 / 3L}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);ee.setTimeout(task);}}

最底层 调用lua脚本续命

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if 续命的脚本", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));}

续命的脚本 pexpire

  • 如果这把锁还在,重新设置 pexpire
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; 
return 0;

解锁的方法unlockInnerAsync

核心代码

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if 下面的lua脚本", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));}

解锁的 lua脚本

  • 会发布一个解锁的消息
  • 发布的通道为:redisson_lock__channel
    • 注意 第二个是 __双下划线
// 如果 这个 hashKey 不存在,直接返回0
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; //否则,这个 key存在,就 减去1,接收返回值,
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //如果返回值 大于0,就续期30秒。
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; //如果这个 结果 不大于0,就删除,并发发布一个解锁的消息
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; 
return nil;PUBLISH channel message
发布一条消息到频道
this.getChannelName() 发布到:
LockPubSub.UNLOCK_MESSAGE, public static final Long UNLOCK_MESSAGE = 0L;//最终返回:redisson_lock__channel:{atguiguLock}String getChannelName() {return prefixName("redisson_lock__channel", this.getName()); //atguiguLock}

总结 敲黑板

背会 RFuture onComplete BiConsumer

exists hincrby pexpire

Redisson 实现原理

  • lock 加锁 使用lua脚本
    • 加锁成功,放入的是一个 redis key为: 你设置的锁名字,value为 hash
      • hash的 key为:uuid:线程id,第一次 过来这个线程,值为1
        • 这个值为1,就是做 可重入锁,如果当前线程 再次加锁,值变为2
      • 默认设置超时时间为 30秒
      • 启动看门狗,看门狗每 10秒 执行一次(超时时间/3),如果当前key还存在 就续期 成30秒。
    • 加锁失败,进入自旋 抢锁逻辑。
  • unlock解锁 依然使用lua脚本
  • lock 加锁,如果设置了 超时间的 ,就没有看门够机制,到了超时时间,会自动释放锁。

具体代码怎么做的

  • 使用 RFuture 类,
    • 是 Redisson 自己写的一个接口,核心 有个回调方法 onComplete,
  • 使用了 BiConsumer (函数式消费者 接口,2个参数的)
    • 所以 在执行完毕后 会执行 onComplete 回调方法,方法里执行看门狗逻辑
    • 看门狗里的代码 最终依然是 RFuture 类,每次正常续期后,睡10秒后,在 onComplete 再次调用自己,决定是否再次续期。
RFuture<V> extends Future<V>, CompletionStage{void onComplete(BiConsumer<? super V, ? super Throwable> var1);
}

具体lua脚本如何做的

加锁

  • 加锁用 exists 命令判断 为0是不存在,如果 不存在 才加锁。加锁的命令为 hincrby,早起的版本3.6左右用 hset,3.13 用 hincrby
  • 使用命令 pexpire 设置过期的毫秒,默认为 3万,即:30秒。
  • 使用 hexists 判断 当前 redis 锁的hash key否存在(redis key是锁名字,里面hash 的 key 为 UUID:线程ID ),为1是存在。
    • 如果 存在,就把这个 hash的 value +1,依然使用 hincrby
  • 最后使用 pexpire ,把当前值,重新设置为:30秒。
  • 脚本的返回为 pttl 当前 这个redis 剩余的毫秒数
//加锁的逻辑
//早起的版本为 hset设置。新版为:hincrby
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); //调用 过期时间为 参数130万毫秒,30秒redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end; //如果 这个 hash 存在(即:hexists 返回1if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then //使用 hincrby,增加1redis.call('hincrby', KEYS[1], ARGV[2], 1); //重置过期时间为 30秒redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end;return redis.call('pttl', KEYS[1]);

续命

  • 续命:lua脚本使用的 hexists ,如果 redis 里这个 hash存在,pexpire 续命为 3万毫秒。
//续命的逻辑
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; 
return 0;

解锁

  • 解锁:依然使用 hexists,如果 这个redis hash的key 存在,在使用 hincrby 增加 -1,就是 减去1
    • 判断这个值 减去1后 是否 >0,如果 > 0,就 重新设置为 3万毫秒
    • 否则 就删除这条 redis,并发送一个 解锁的消息。发布内容为 0,通道为:redisson_lock__channel:{你的锁key}
//释放锁的逻辑
// 如果 这个 hashKey 不存在,直接返回0
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; //否则,这个 key存在,就 减去1,接收返回值,
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //如果返回值 大于0,就续期30秒。
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; 
//如果这个 结果 不大于0,就删除,并发发布一个解锁的消息
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; 
return nil;PUBLISH channel message
发布一条消息到频道this.getChannelName() 发布到:
LockPubSub.UNLOCK_MESSAGE, public static final Long UNLOCK_MESSAGE = 0L;//最终返回:redisson_lock__channel:{atguiguLock}String getChannelName() {return prefixName("redisson_lock__channel", this.getName()); //atguiguLock}