由于redis是单线程的,所以看lua脚本的时候只需要使用单线程的思维去看就行了,而且个人不建议使用lua脚本编写太复杂的业务,特别是有循环的,写得不好可能会使redis陷入卡顿,甚至死循环直接卡死无法运行其他命令。感兴趣可以自己试一下。
问题1 :为什么 redisson 实现分布式锁的数据类型是 hash 而不是 string。
我的理解:为了支持两个参数 ( 可重入计数 + 线程标识 ) ,如果不使用hash无法实现。其中这个hash里只有一个元素,不会出现两个元素。
非公平锁
实现方式 :hash + PubSub
加锁(tryLockInnerAsync)
--[[ 参数 Collections.singletonList( this.getName() -- key1 ), new Object[]{ this.internalLockLeaseTime, --arg1 this.getLockName(threadId)} --arg2 ]]-- -- 如果锁对应的hash不存在 if (redis.call(\'exists\', KEYS[1]) == 0) then -- 加锁成功,并且设置过期时间 redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; -- 如果锁存在 if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then -- 进行计数+1 (为了可重入) redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; -- 获取剩余过期时间 return redis.call(\'pttl\', KEYS[1]);
--[[ 参数 Arrays.asList( this.getName(), --key1 this.getChannelName() --key2 ), new Object[]{ LockPubSub.unlockMessage, --arg1 this.internalLockLeaseTime, --arg2 this.getLockName(threadId) --arg3 } ]]-- -- 锁对应的hash不存在 if (redis.call(\'exists\', KEYS[1]) == 0) then -- 通知抢锁。 redis.call(\'publish\', KEYS[2], ARGV[1]); --结束 return 1; end; -- 如果锁不存在,不处理 if (redis.call(\'hexists\', KEYS[1], ARGV[3]) == 0) then return nil; end; --对其中的元素进行计数-1 实现可重入 local counter = redis.call(\'hincrby\', KEYS[1], ARGV[3], -1); -- 如果此时还有计数 if (counter > 0) then -- 刷新过期时间 redis.call(\'pexpire\', KEYS[1], ARGV[2]); return 0; else -- 解锁,通知其他线程争抢锁。 redis.call(\'del\', KEYS[1]); redis.call(\'publish\', KEYS[2], ARGV[1]); return 1; end; return nil;
公平锁
实现方式 :list + zset + hash + PubSub
zset用于处理过期时间
list用于处理先后顺序
加锁 (tryLockInnerAsync)
--[[ 参数 Arrays.asList( this.getName(), --key1 this.threadsQueueName, --key2 this.timeoutSetName), --key3 new Object[]{ this.internalLockLeaseTime, --arg1 this.getLockName(threadId), --arg2 currentTime + 5000L, --arg3 currentTime} --arg4 ]]-- while true do -- 取出队列中队头元素 local firstThreadId2 = redis.call(\'lindex\', KEYS[2], 0); --队列中没有元素了就结束 if firstThreadId2 == false then break; end; -- 从zset中获取对应元素的过期时间 local timeout = tonumber(redis.call(\'zscore\', KEYS[3], firstThreadId2)); -- 已到过期时间 if timeout <= tonumber(ARGV[4]) then -- 从过期zset和队列中移除 redis.call(\'zrem\', KEYS[3], firstThreadId2); redis.call(\'lpop\', KEYS[2]); else break; --直至没有过期元素就结束 end; end; -- 没有获取锁 if (redis.call(\'exists\', KEYS[1]) == 0) and ( (redis.call(\'exists\', KEYS[2]) == 0) -- 队列中没有元素 or (redis.call(\'lindex\', KEYS[2], 0) == ARGV[2]) -- 或者元素在队列的头部 ) then --从set和队列中移除,并且加锁成功 redis.call(\'lpop\', KEYS[2]); redis.call(\'zrem\', KEYS[3], ARGV[2]); redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; -- 如果已经获取到了锁,那么就进行计数+1 表示重入。 if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; -- 获取队列的队头元素 local firstThreadId = redis.call(\'lindex\', KEYS[2], 0); local ttl; -- 如果元素不存在并且不是加锁的元素 -- (判断是否是刚加锁成功的) *只用于计算超时时间 if firstThreadId ~= false and firstThreadId ~= ARGV[2] then -- 计算剩余的时间 (zset获取后计算) ttl = tonumber(redis.call(\'zscore\', KEYS[3], firstThreadId)) - tonumber(ARGV[4]); else -- 获取剩余时间 (直接获取) ttl = redis.call(\'pttl\', KEYS[1]); end; --计算超时时间 local timeout = ttl + tonumber(ARGV[3]); -- 向set中保存超时时间 if redis.call(\'zadd\', KEYS[3], timeout, ARGV[2]) == 1 then -- 保存成功向队列尾部添加 redis.call(\'rpush\', KEYS[2], ARGV[2]); end; return ttl;
解锁(unlockInnerAsync)
--[[ Arrays.asList( this.getName(), --key1 this.threadsQueueName, --key2 this.timeoutSetName, --key3 this.getChannelName() --key4 ), new Object[]{ LockPubSub.unlockMessage, --arg1 this.internalLockLeaseTime, --arg2 this.getLockName(threadId), --arg3 System.currentTimeMillis --arg4 } ]]-- -- 与加锁过程相同,作用是清除过期的等待者 while true do local firstThreadId2 = redis.call(\'lindex\', KEYS[2], 0); if firstThreadId2 == false then break; end; local timeout = tonumber(redis.call(\'zscore\', KEYS[3], firstThreadId2)); if timeout <= tonumber(ARGV[4]) then redis.call(\'zrem\', KEYS[3], firstThreadId2); redis.call(\'lpop\', KEYS[2]); else break; end; end; --如果锁对应的hash不存在 if (redis.call(\'exists\', KEYS[1]) == 0) then -- 并且队列中有元素,发消息通知下一个线程竞争锁 local nextThreadId = redis.call(\'lindex\', KEYS[2], 0); -- 队列中还有元素 if nextThreadId ~= false then redis.call(\'publish\', KEYS[4] .. \':\' .. nextThreadId, ARGV[1]); end; --结束 return 1; end; -- 如果锁不存在 直接结束 if (redis.call(\'hexists\', KEYS[1], ARGV[3]) == 0) then return nil; end; --对持有的锁进行计数-1,表示可重入 local counter = redis.call(\'hincrby\', KEYS[1], ARGV[3], -1); -- 如果大于0说明还有持有,刷新持有锁的过期时间。 if (counter > 0) then redis.call(\'pexpire\', KEYS[1], ARGV[2]); --结束 return 0; end; -- 解锁 redis.call(\'del\', KEYS[1]); -- 获取队列的队头元素 local nextThreadId = redis.call(\'lindex\', KEYS[2], 0); -- 如果还有元素,通知下一个线程抢锁。 if nextThreadId ~= false then redis.call(\'publish\', KEYS[4] .. \':\' .. nextThreadId, ARGV[1]); end; --结束 return 1;