Redisson分布式锁源码分析

分布式锁的特点

  • 互斥性
  • 锁超时
  • 可重入性
  • 支持阻塞和非阻塞
  • 性能好

Redisson实现分布式锁示例

public class DistributedLockerImpl {

    private RedissonClient redissonClient;

    //加锁
    public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
        //获取锁对象
        RLock lock = redissonClient.getLock(lockKey);
        try {
            //最常用的使用方法
            //waitTime:获取锁的最大等待时间
            //leaseTime:获取锁成功的持有时间,过期自动解锁
            //unit:时间单位
            return lock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            log.error("tryLock error", e);
            Thread.currentThread().interrupt();
            return false;
        }
    }
        
    //解锁
    public void unlock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        if(lock != null) {
            lock.unlock();
        }
    }
}

加锁源码分析

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        //加锁核心代码
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        /**
        订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
	基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件
	一旦锁释放会发消息通知待等待的线程进行竞争.
        */
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            //如果获取锁的耗时超过最大等待时间,加锁失败
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        	//在最大等待时间内循环获取锁
            while (true) {
                long currentTime = System.currentTimeMillis();
                //获取锁
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
				//判断时间
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message,等待解锁消息
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
				//更新等待时间
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            //取消订阅消息
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

如果tryAcquire()获取锁成功,返回null,ttl为已存在锁的过期时间。

如果tryAcquire()获取锁失败,判断最大等待时间是否大于加锁耗时时间,如果小于,直接返回false加锁失败。否则,客户端的线程id通过redis的channel订阅锁释放的事件,超过最大等待时间,则加锁失败,如果等到了锁的释放事件通知,则进入一个不断获取锁的循环,尝试加锁。

加锁核心代码:
tryAcquireAsync():

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    //如果leaseTime!=-1,加锁成功直接返回,否则启动一个Watch Dog线程,定期延长持有锁时间
    if (leaseTime != -1) {
        //加锁
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }
		// 续锁逻辑
        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

tryLockInnerAsync():

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call(‘exists‘, KEYS[1]) == 0) then " +
                        "redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " +
                        "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call(‘hexists‘, KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " +
                        "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call(‘pttl‘, KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

debug结果:
Redisson分布式锁源码分析

lua 脚本含义:

第一段 if 判断语句,就是用 exists myLock 命令判断一下,如果要加锁的key 不存在,就使用 hincrby 命令设置一个 hash 结构,接着会执行 pexpire myLock 1800000 命令,设置 myLock 这个锁 key 的生存时间是 1800 秒。到此为止,加锁完成。

如果此时有第二个客户端请求加锁,就会进入第二个if判断语句,判断myLock 锁 key 的 hash 数据结构中,是否包含客户端 2 的 ID,如果不是,客户端 2 会执行:return redis.call(‘pttl‘, KEYS[1]);返回了myLock持有锁的时间。

可重入锁
如果myLock 锁 key 的 hash 数据结构中,包含客户端的 ID,就会执行hincrby myLock 5f431de4-a523-4bd7-965b-32b83fee3897:366 1 ,加锁次数加1,此时存储结果如下:
Redisson分布式锁源码分析
redis的key为锁名称(所示myLock),hash结构key为客户端ID,value为客户端加锁的次数

解锁源码分析

public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    //释放锁
    RFuture<Boolean> future = unlockInnerAsync(threadId);
	//取消watch dog机制
    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

unlockInnerAsync()

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call(‘hexists‘, KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call(‘hincrby‘, KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call(‘pexpire‘, KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call(‘del‘, KEYS[1]); " +
                    "redis.call(‘publish‘, KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

lua脚本含义:

判断锁搜否存在,如果存在,将客户端ID对应的value值递减,直到为0后删除缓存,然后向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息

Redisson方案缺点

  1. 如果你对某个 Redis Master 实例完成了加锁,此时 Master 会异步复制给其对应的 slave 实例。在这期间,假如 Master 宕机,主备切换,slave 变为了 Master。此时客户端 2 来尝试加锁的时候,在新的 Master 上完成了加锁,而客户端 1 也以为自己成功加了锁,此时就会导致多个客户端对一个分布式锁完成了加锁,失去控制。这是Redis Master-Slave 架构的主从异步复制导致的 Redis 分布式锁的最大缺陷(在 Redis Master 实例宕机的时候,可能导致多个客户端同时完成加锁)。

Redisson分布式锁源码分析

上一篇:Routine Afternoon Tea


下一篇:Vue 中 watch 的一个坑