在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处理的数据添加到队列当中,然后按照排队的先后顺序进行异步处理。 这个队列,可以是专业的消息队列,如 RocketMQ/RabbitMQ 等,一般项目中,如果只是为了进行异步,未免有点杀鸡用牛刀的意味。 因此,本文主要讲解基于 Redis 的方式实现异步队列。 本文首发个人技术博客: https://nullpointer.pw/redis-block-queue.html 基于 Redis 的 list 实现队列的方式也有多种,先说第一种不推荐的方式,即使用 第二种方式也是不推荐的方式,也是通过 第二种方式就是下文要介绍的方式,首先也是通过 Redis 基础# 将一个或多个值 value 插入到列表 key 的表头 LPUSH key value [value …] # 阻塞式等待,将列表 source 中的最后一个元素 (尾元素) 弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长 (block indefinitely) 。 BRPOPLPUSH source destination timeout # 根据参数 count 的值,移除列表中与参数 value 相等的元素。 LREM key count value 代码实现队列消息生产者笔者使用的是 Spring 相关 API 实现对 Redis 指令的调用。首先实现消息的生产代码,封装到一个工具类方法当中。这里很简单,就是调用了 lpush 方法,将序列化的 key 和 value 添加到列表当中去。 @Resource private RedisConnectionFactory connectionFactory; public void lPush(@Nonnull String key, @Nonnull String value) { RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory); try { byte[] byteKey = RedisSerializer.string().serialize(getKey(key)); byte[] byteValue = RedisSerializer.string().serialize(value); assert byteKey != null; connection.lPush(byteKey, byteValue); } finally { RedisConnectionUtils.releaseConnection(connection, connectionFactory); } } 代码实现队列消息消费者因为实现队列消费消息的代码比较多,不可能每个需要阻塞消费的地方,对需要写这一坨代码,因此使用 Java8 的函数式接口实现方法的传递,同时阻塞式获取消息代码使用新线程去执行。 有人看到以下代码要吐槽了,不是说不用 while(true) 吗,怎么你这里面还是有,这里稍微解释一下,因为 SpringBoot 一般会指定 timeout 的全局超时时间,即使 public void bRPopLPush(@Nonnull String key, Consumer<String> consumer) { CompletableFuture.runAsync(() -> { RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory); try { byte[] srcKey = RedisSerializer.string().serialize(getKey(key)); byte[] dstKey = RedisSerializer.string().serialize(getBackupKey(key)); assert srcKey != null; assert dstKey != null; while (true) { byte[] byteValue = new byte[0]; boolean success = false; try { byteValue = connection.bRPopLPush(0, srcKey, dstKey); if (byteValue != null && byteValue.length != 0) { consumer.accept(new String(byteValue)); success = true; } } catch (Exception ignored) { // 防止获取 key 达到超时时间抛出 QueryTimeoutException 异常退出 } finally { if (success) { // 处理成功才删除备份队列的 key connection.lRem(dstKey, 1, byteValue); } } } } finally { RedisConnectionUtils.releaseConnection(connection, connectionFactory); } }); } 测试代码@Test public void testLPush() throws InterruptedException { String queueA = "queueA"; int i = 0; while (true) { String msg = "Hello-" + i++; redisBlockQueue.lPush(queueA, msg); System.out.println("lPush: " + msg); Thread.sleep(3000); } } @Test public void testBRPopLPush() { String queueA = "queueA"; redisBlockQueue.bRPopLPush(queueA, (val) -> { // 在这里处理具体的业务逻辑 System.out.println("val: " + val); }); // 防止 Junit 进程退出 LockSupport.park(); } 项目使用方式为了方便使用,我将其抽取为了一个工具类,使用时通过 Spring 注入使用即可, @Resource private RedisBlockQueue redisBlockQueue; @PostConstruct public void init() { redisBlockQueue.bRPopLPush(xx, (value) -> { //... }); } 本文完整代码下载github 地址 到此这篇关于基于Redis实现阻塞队列的文章就介绍到这了,更多相关Redis阻塞队列内容请搜索极客世界以前的文章或继续浏览下面的相关文章希望大家以后多多支持极客世界! |
请发表评论