javammc

简易的redis分布式锁

加锁:

set key my_random_value NX PX 30000

这个命令比setnx好,因为可以同时设置过期时间。不设置过期时间,应用挂了,解不了锁,就一直锁住了。

解锁:

if redis.call("get",KEYS[1])==ARGV[1] then
   return redis.call("del",KEYS[1])
else
  return 0
end

先比较一下值,相等才删除。防止其他线程把锁给解了。

以上方案在一般的场景就够用了,但还存在一些小问题:

  1. 如果设置过期时间3秒,但是业务执行需要4秒怎么办?

解决方案:参照redisson的看门狗,可以后台起一个线程去看看业务线程执行完了没有,如果没有就延长过期时间。

  1. redis是单点的,如果宕机了,那么整个系统就会崩溃。如果是主从结构,那么master宕机了,存储的key还没同步到slave,此时slave升级为新的master,客户端2从新的master上就能拿到同一个资源的锁。这样客户端1和客户端2都拿到锁,就不安全了。

解决方案:RedLock算法。简单说就是N个(通常是5)独立的redis节点同时执行SETNX,如果大多数成功了,就拿到了锁。这样就允许少数节点不可用。

那我们看看工业级别是怎么实现redis分布式锁的呢?

Redission实现的redis分布式锁

加锁流程:

解锁流程:

Redission加锁使用的是redis的hash结构。

  • key :要锁的资源名称
  • filed :uuid+":"+线程id
  • value : 数值型,可以实现可重入锁

源码里面用到了netty里面Promise的一些api,我列出来帮助理解:

    // 异步操作完成且正常终止
    boolean isSuccess();
    // 异步操作是否可以取消
    boolean isCancellable();
    // 异步操作失败的原因
    Throwable cause();
    // 添加一个监听者,异步操作完成时回调,类比javascript的回调函数
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 阻塞直到异步操作完成
    Future<V> await() throws InterruptedException;
    // 同上,但异步操作失败时抛出异常
    Future<V> sync() throws InterruptedException;
    // 非阻塞地返回异步结果,如果尚未完成返回null
    V getNow();

源码分析:

加锁:

public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
    
     public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        //命令执行器
        this.commandExecutor = commandExecutor;
       //uuid
        this.id = commandExecutor.getConnectionManager().getId();
        //超时时间,默认30s
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
    }
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        //获取线程id
        long threadId = Thread.currentThread().getId();
        //尝试获取锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        //ttl为空则代表加锁成功
        if (ttl == null) {
            return;
        }

  //如果获取锁失败,则订阅到对应这个锁的channel,等其他线程释放锁时,通知线程去获取锁
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
            //再次尝试获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                 //ttl大于0,则等待ttl时间后继续尝试获取锁
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
        //取消对channel的订阅
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

再来看看里面的尝试获取锁的代码:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
        //如果带有过期时间,则按照普通方式获取锁
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
         //先按照30秒的过期时间来执行获取锁的方法
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //异步执行回调监听
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
             //如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
            public void operationComplete(Future<Long> future) throws Exception {
            //没有成功执行完成
                if (!future.isSuccess()) {
                    return;
                }
       //非阻塞地返回异步结果,如果尚未完成返回null
                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

看门狗逻辑:

使用的是Netty的Timeout延迟任务做的。

  • 比如锁过期 30 秒, 每过 1/3 时间也就是 10 秒会检查锁是否存在, 存在则更新锁的超时时间

加锁脚本

 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
        //如果锁不存在,则通过hset设置它的值,并设置过期时间
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                   //如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1,并重新设置过期时间
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                     //如果锁已存在,但并非本线程,则返回过期时间
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

解锁:

public RFuture<Void> unlockAsync(final long threadId) {
        final RPromise<Void> result = new RedissonPromise<Void>();
        //底层解锁方法
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }

                Boolean opStatus = future.getNow();
                //如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
                if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });

        return result;
    }

解锁脚本:

 protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                 //如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                 //通过hincrby递减1的方式,释放一次锁
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                 //若剩余次数大于0 ,则刷新过期时间
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                  //否则证明锁已经释放,删除key并发布锁释放的消息
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }

相关文章: