• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

SpringBoot使用RedisTemplate+Lua脚本实现Redis分布式锁

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

SpringBoot使用RedisTemplate+Lua脚本实现Redis分布式锁

问题:定时任务部署在多台Tomcat上,因此到达指定的定时时间时,多台机器上的定时器可能会同时启动,造成重复数据或者程序异常等问题。

//发送消息,不能重复发送
@Scheduled(cron = "0 0/15 * * * ? ")
public void sendMsg(String userId) {
 
}

项目部署在Tom 1 ,Tom 2

如何控制只有一个Tomcat在同一时刻执行任务

使用分布式锁来控制,谁抢到了锁就让谁执行。

一、基于Redis实现分布式锁

package cn.pconline.pcloud.base.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @Description 使用RedisTemplate+Lua脚本实现Redis分布式锁
 * @Author jie.zhao
 * @Date 2019/11/19 11:46
 */
@Component
public class RedisLock {
    @Autowired
    private StringRedisTemplate template;

    private static final Long RELEASE_SUCCESS = 1L;

    private static final long DEFAULT_TIMEOUT = 1000 * 10;

    private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

    /**
     * 尝试获取锁 立即返回
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock(String key, String value, long timeout) {
        return template.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
    }

    /**
     * 以阻塞方式的获取锁
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lockBlock(String key, String value, long timeout) {
        long start = System.currentTimeMillis();
        while (true) {
            //检测是否超时
            if (System.currentTimeMillis() - start > timeout) {
                return false;
            }
            //执行set命令
            //1
            Boolean absent = template.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
            //其实没必要判NULL,这里是为了程序的严谨而加的逻辑
            if (absent == null) {
                return false;
            }
            //是否成功获取锁
            if (absent) {
                return true;
            }
        }
    }

    public boolean unlock(String key, String value) {
        // 使用Lua脚本:先判断是否是自己设置的锁,再执行删除
        // 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
        // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常EvalSha is not supported in cluster environment.,所以只能拿到原redis的connection来执行脚本

        List<String> keys = new ArrayList<>();
        keys.add(key);
        List<String> args = new ArrayList<>();
        args.add(value);

        Long result = template.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }

                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }
                return 0L;
            }
        });

        //返回最终结果
        return RELEASE_SUCCESS.equals(result);
    }
}

使用方法:

@Scheduled(cron = "0 0/15 * * * ? ")
public void sendExamTemplateMsg() {
    if (redisLock.lock(RedisKey.REDIS_JOB_SEND_KEY, RedisKey.REDIS_JOB_SEND_VALUE, 1000 * 60)) {
        
        //....
        log.info("定时轮询考试安排通知结束 \t" + new Date());
    } else {
        log.info("定时轮询考试安排,未获取到锁其他应用正在执行 \t" + new Date());
    }
}

二、分布式锁的要求

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

三、存在的问题

注意!!!! 该加锁方法仅针对单实例 Redis 可实现分布式加锁,或者使用场景少的业务。

原因对于 Redis 集群会有一定几率出现问题

例如:当进程1对master节点写入了锁,此时master节点宕机,slave节点提升为master而刚刚写入master的锁还未同步,此时进程2也将能够获取锁成功,此时必然会导致数据不同步问题。还有另一个问题即: key 超时之后业务并没有执行完毕但却自动释放锁了,这样就会导致并发问题。

如果需要更加健壮的Redis集群分布式锁,推荐使用Redisson


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
lua-遍历数组发布时间:2022-07-22
下一篇:
lua---用break实现continue逻辑发布时间:2022-07-22
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap