在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
上篇redis实现的分布式锁,有一个问题,它不可重入。 所谓不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。 同一个人拿一个锁 ,只能拿一次不能同时拿2次。 1、什么是可重入锁?它有什么作用? 可重入锁,也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。 说白了就是同一个线程再次进入同样代码时,可以再次拿到该锁。 它的作用是:防止在同一线程中多次获取锁而导致死锁发生。 2、那么java中谁实现了可重入锁了? 在java的编程中synchronized 和 ReentrantLock都是可重入锁。我们可以参考ReentrantLock的代码 3、基于ReentrantLock的可重入锁 ReentrantLock,是一个可重入且独占式的锁,是一种递归无阻塞的同步锁。 3.1、看个ReentrantLock的例子 import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class ReentrantLockDemo { //锁 private static ReentrantLock lock = new ReentrantLock(); public void doSomething(int n){ try{ //进入递归第一件事:加锁 lock.lock(); log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked()); log.info("--------递归{}次--------",n); if(n<=2){ this.doSomething(++n); }else{ return; } }finally { lock.unlock(); log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked()); } } public static void main(String[] args) { ReentrantLockDemo reentrantLockDemo=new ReentrantLockDemo(); reentrantLockDemo.doSomething(1); log.info("执行完doSomething方法 是否还持有锁:{}",lock.isLocked()); } } 3.2、执行结果
3.3、 从上面栗子可以看出ReentrantLock是可重入锁,那么他是如何实现的了,我们看下源码就知道了 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //先判断,c(state)是否等于0,如果等于0,说明没有线程持有锁 if (c == 0) { //通过cas方法把state的值0替换成1,替换成功说明加锁成功 if (compareAndSetState(0, acquires)) { //如果加锁成功,设置持有锁的线程是当前线程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//判断当前持有锁的线程是否是当前线程 //如果是当前线程,则state值加acquires,代表了当前线程加锁了多少次 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ReentrantLock的加锁流程是: /** * 释放锁 * @param releases * @return */ protected final boolean tryRelease(int releases) { int c = getState() - releases;//state-1 减加锁次数 //如果持有锁的线程,不是当前线程,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//如果c==0了说明当前线程,已经要释放锁了 free = true; setExclusiveOwnerThread(null);//设置当前持有锁的线程为null } setState(c);//设置c的值 return free; } 看ReentrantLock的解锁代码我们知道,每次释放锁的时候都对state减1, 4、那么redis要怎么实现可重入的操作了? 看ReentrantLock的源码我们知道,它是加锁成功了,记录了当前持有锁的线程,并通过一个int类型的数字,来记录了加锁次数。 我们知道ReentrantLock的实现原理了,那么redis只要下面两个问题解决,就能实现重入锁了: 4.1、第一个问题:怎么保存当前持有的线程 1.上一篇文章我们用的是redis 的set命令存的是string类型,他能保存当前持有的线程吗? 4.2、第二个问题:加锁次数(重入了多少次),怎么记录维护 他能记录下来加锁次数吗? 存没问题了,但是重入次数要怎么维护了, 它肯定要保证原子性的,能解决吗?
5、我们已经知道SET是不支持重入锁的,但我们需要重入锁,怎么办呢? 目前对于redis的重入锁业界还是有很多解决方案的,最流行的就是采用Redisson。 6、什么是 Redisson? Redisson是Redis官方推荐的Java版的Redis客户端。 它基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。 它在网络通信上是基于NIO的Netty框架,保证网络通信的高性能。 在分布式锁的功能上,它提供了一系列的分布式锁;如:可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等等。 7、Redisson的分布锁如何使用 引入依赖包 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.15.5</version> </dependency> 代码 import lombok.extern.slf4j.Slf4j; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SingleServerConfig; @Slf4j public class ReentrantLockDemo1 { //锁 public static RLock lock; static { //Redisson需要的配置 Config config = new Config(); String node = "127.0.0.1:6379";//redis地址 node = node.startsWith("redis://") ? node : "redis://" + node; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(node) .setTimeout(3000)//超时时间 .setConnectionPoolSize(10) .setConnectionMinimumIdleSize(10); //serverConfig.setPassword("123456");//设置redis密码 // 创建RedissonClient客户端实例 RedissonClient redissonClient = Redisson.create(config); //创建redisson的分布式锁 RLock rLock = redissonClient.getLock("666"); lock = rLock; } public void doSomething(int n){ try{ //进入递归第一件事:加锁 lock.lock(); log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked()); log.info("--------递归{}次--------",n); if(n<=2){ this.doSomething(++n); }else{ return; } }finally { lock.unlock(); log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked()); } } public static void test(){ log.info("--------------start---------------"); ReentrantLockDemo1 reentrantLockDemo=new ReentrantLockDemo1(); reentrantLockDemo.doSomething(1); log.info("执行完doSomething方法 是否还持有锁:{}",ReentrantLockDemo1.lock.isLocked()); log.info("--------------end---------------"); } public static void main(String[] args) { test(); } } 执行结果
看控制台打印能清楚知道Redisson是支持可重入锁了。 8、那么Redisson是如何实现的了? 我们跟一下lock.lock()的代码,发现它最终调用的是org.redisson.RedissonLock#tryLockInnerAsync的方法,具体如下: <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); } 8.1、上面的代码,用到的redis命令先梳理一下 exists 查询一个key是否存在
hincrby :将hash中指定域的值增加给定的数字 pexpire:设置key的有效时间以毫秒为单位 hexists:判断field是否存在于hash中 pttl:获取key的有效毫秒数 8.2、看lua脚本传入的参数我们知道:
protected String getLockName(long threadId) { return id + ":" + threadId; } 8.3、代码截图 从截图上可以看到,它是使用lua脚本来保证多个命令执行的原子性,使用了hash来实现了分布式锁 8.4、第一个if判断
8.5、下面来看第二个if判断
8.6、下图是redis可视化工具看到是如何在hash存储的结构 Redisson的整个加锁流程跟ReentrantLock的加锁逻辑基本相同 8.7、解锁代码位于 org.redisson.RedissonLock#unlockInnerAsync,如下: return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); } 看这个解锁的Lua脚本,流程跟Reentrantlock的解锁逻辑也基本相同没啥好说的了。 以上就是redis分布式锁-可重入锁的详细内容,更多关于redis分布式锁的资料请关注极客世界其它相关文章! |
请发表评论