在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。 候选方案对比 下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。
如果应用的数据量不高,实时性要求比较低,选用调度框架和 里面刚好用到了调度框架和 由于PPT中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。 场景设计 实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做 否决的候选方案实现思路 下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。 JDK内置延迟队列
public class DelayQueueMain { private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class); public static void main(String[] args) throws Exception { DelayQueue<OrderMessage> queue = new DelayQueue<>(); // 默认延迟5秒 OrderMessage message = new OrderMessage("ORDER_ID_10086"); queue.add(message); // 延迟6秒 message = new OrderMessage("ORDER_ID_10087", 6); queue.add(message); // 延迟10秒 message = new OrderMessage("ORDER_ID_10088", 10); queue.add(message); ExecutorService executorService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setName("DelayWorker"); thread.setDaemon(true); return thread; }); LOGGER.info("开始执行调度线程..."); executorService.execute(() -> { while (true) { try { OrderMessage task = queue.take(); LOGGER.info("延迟处理订单消息,{}", task.getDescription()); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } }); Thread.sleep(Integer.MAX_VALUE); } private static class OrderMessage implements Delayed { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); /** * 默认延迟5000毫秒 */ private static final long DELAY_MS = 1000L * 5; /** * 订单ID */ private final String orderId; /** * 创建时间戳 */ private final long timestamp; /** * 过期时间 */ private final long expire; /** * 描述 */ private final String description; public OrderMessage(String orderId, long expireSeconds) { this.orderId = orderId; this.timestamp = System.currentTimeMillis(); this.expire = this.timestamp + expireSeconds * 1000L; this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F)); } public OrderMessage(String orderId) { this.orderId = orderId; this.timestamp = System.currentTimeMillis(); this.expire = this.timestamp + DELAY_MS; this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F)); } public String getOrderId() { return orderId; } public long getTimestamp() { return timestamp; } public long getExpire() { return expire; } public String getDescription() { return description; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } } } 注意一下, 10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 开始执行调度线程... 10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10086]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:13 10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10087]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:14 10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10088]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:18 调度框架 + MySQL 使用调度框架对
引入 <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <version>2.1.7.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.1</version> <scope>test</scope> </dependency> 假设表设计如下: CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; USE `delayTask`; CREATE TABLE `t_order_message` ( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(50) NOT NULL COMMENT '订单ID', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建日期时间', edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期时间', retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '重试次数', order_status TINYINT NOT NULL DEFAULT 0 COMMENT '订单状态', INDEX idx_order_id (order_id), INDEX idx_create_time (create_time) ) COMMENT '订单信息表'; # 写入两条测试数据 INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087'); 编写代码: // 常量 public class OrderConstants { public static final int MAX_RETRY_TIMES = 5; public static final int PENDING = 0; public static final int SUCCESS = 1; public static final int FAIL = -1; public static final int LIMIT = 10; } // 实体 @Builder @Data public class OrderMessage { private Long id; private String orderId; private LocalDateTime createTime; private LocalDateTime editTime; private Integer retryTimes; private Integer orderStatus; } // DAO @RequiredArgsConstructor public class OrderMessageDao { private final JdbcTemplate jdbcTemplate; private static final ResultSetExtractor<List<OrderMessage>> M = r -> { List<OrderMessage> list = Lists.newArrayList(); while (r.next()) { list.add(OrderMessage.builder() .id(r.getLong("id")) .orderId(r.getString("order_id")) .createTime(r.getTimestamp("create_time").toLocalDateTime()) .editTime(r.getTimestamp("edit_time").toLocalDateTime()) .retryTimes(r.getInt("retry_times")) .orderStatus(r.getInt("order_status")) .build()); } return list; }; public List<OrderMessage> selectPendingRecords(LocalDateTime start, LocalDateTime end, List<Integer> statusList, int maxRetryTimes, int limit) { StringJoiner joiner = new StringJoiner(","); statusList.forEach(s -> joiner.add(String.valueOf(s))); return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " + "AND order_status IN (?) AND retry_times < ? LIMIT ?", p -> { p.setTimestamp(1, Timestamp.valueOf(start)); p.setTimestamp(2, Timestamp.valueOf(end)); p.setString(3, joiner.toString()); p.setInt(4, maxRetryTimes); p.setInt(5, limit); }, M); } public int updateOrderStatus(Long id, int status) { return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?", p -> { p.setInt(1, status); p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now())); p.setLong(3, id); }); } } // Service @RequiredArgsConstructor public class OrderMessageService { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class); private final OrderMessageDao orderMessageDao; private static final List<Integer> STATUS = Lists.newArrayList(); static { STATUS.add(OrderConstants.PENDING); STATUS.add(OrderConstants.FAIL); } public void executeDelayJob() { LOGGER.info("订单处理定时任务开始执行......"); LocalDateTime end = LocalDateTime.now(); // 一天前 LocalDateTime start = end.minusDays(1); List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT); if (!list.isEmpty()) { for (OrderMessage m : list) { LOGGER.info("处理订单[{}],状态由{}更新为{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS); // 这里其实可以优化为批量更新 orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS); } } LOGGER.info("订单处理定时任务开始完毕......"); } } // Job @DisallowConcurrentExecution public class OrderMessageDelayJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService"); service.executeDelayJob(); } public static void main(String[] args) throws Exception { HikariConfig config = new HikariConfig(); config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8"); config.setDriverClassName(Driver.class.getName()); config.setUsername("root"); config.setPassword("root"); HikariDataSource dataSource = new HikariDataSource(config); OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource)); OrderMessageService service = new OrderMessageService(orderMessageDao); // 内存模式的调度器 StdSchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler = factory.getScheduler(); // 这里没有用到IOC容器,直接用Quartz数据集合传递服务引用 JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put("orderMessageService", service); // 新建Job JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class) .withIdentity("orderMessageDelayJob", "delayJob") .usingJobData(jobDataMap) .build(); // 新建触发器,10秒执行一次 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("orderMessageDelayTrigger", "delayJob") .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()) .build(); scheduler.scheduleJob(job, trigger); // 启动调度器 scheduler.start(); Thread.sleep(Integer.MAX_VALUE); } } 这个例子里面用了 11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED' Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally. NOT STARTED. Currently in standby mode. Number of jobs executed: 0 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads. Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered. 11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties' 11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1 11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started. 11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers 11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob 11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53 11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers 11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob 11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始执行...... 11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4 11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af 11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7 11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369 11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017 11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0) 11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query 11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?] 11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource 11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1'] 11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10086],状态由0更新为1 11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update 11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?] 11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource 11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10087],状态由0更新为1 11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update 11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?] 11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource 11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始完毕...... 11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers 11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob 11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers RabbitMQ死信队列 使用
画个图描述一下这两个特性: 下面为了简单起见, <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> <scope>test</scope> </dependency> 代码如下: public class DlxMain { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class); public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel producerChannel = connection.createChannel(); Channel consumerChannel = connection.createChannel(); // dlx交换器名称为dlx.exchange,类型是direct,绑定键为dlx.key,队列名为dlx.queue producerChannel.exchangeDeclare("dlx.exchange", "direct"); producerChannel.queueDeclare("dlx.queue", false, false, false, null); producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key"); Map<String, Object> queueArgs = new HashMap<>(); // 设置队列消息过期时间,5秒 queueArgs.put("x-message-ttl", 5000); // 指定DLX相关参数 queueArgs.put("x-dead-letter-exchange", "dlx.exchange"); queueArgs.put("x-dead-letter-routing-key", "dlx.key"); // 声明业务队列 producerChannel.queueDeclare("business.queue", false, false, false, queueArgs); ExecutorService executorService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("DlxConsumer"); return thread; }); // 启动消费者 executorService.execute(() -> { try { consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel)); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } }); OrderMessage message = new OrderMessage("10086"); producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId()); message = new OrderMessage("10087"); producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId()); message = new OrderMessage("10088"); producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId()); Thread.sleep(Integer.MAX_VALUE); } private static class DlxConsumer extends DefaultConsumer { DlxConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { LOGGER.info("处理消息成功:{}", new String(body, StandardCharsets.UTF_8)); } } private static class OrderMessage { private final String orderId; private final long timestamp; private final String description; OrderMessage(String orderId) { this.orderId = orderId; this.timestamp = System.currentTimeMillis(); this.description = String.format("订单[%s],订单创建时间为:%s", orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F)); } public String getOrderId() { return orderId; } public long getTimestamp() { return timestamp; } public String getDescription() { return description; } } } 运行 16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10086 16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10087 16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10088 16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10086],订单创建时间为:2019-08-20 16:35:58 16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10087],订单创建时间为:2019-08-20 16:35:58 16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10088],订单创建时间为:2019-08-20 16:35:58 时间轮 时间轮 这里暂时不对时间轮和其实现作分析,只简单举例说明怎么使用时间轮实现延时任务。这里使用 <dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.39.Final</version> </dependency> 代码如下: public class HashedWheelTimerMain { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); public static void main(String[] args) throws Exception { AtomicInteger counter = new AtomicInteger(); ThreadFactory factory = r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement()); return thread; }; // tickDuration - 每tick一次的时间间隔, 每tick一次就会到达下一个槽位 // unit - tickDuration的时间单位 // ticksPerWhee - 时间轮中的槽位数 Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60); TimerTask timerTask = new DefaultTimerTask("10086"); timer.newTimeout(timerTask, 5, TimeUnit.SECONDS); timerTask = new DefaultTimerTask("10087"); timer.newTimeout(timerTask, 10, TimeUnit.SECONDS); timerTask = new DefaultTimerTask("10088"); timer.newTimeout(timerTask, 15, TimeUnit.SECONDS); Thread.sleep(Integer.MAX_VALUE); } private static class DefaultTimerTask implements TimerTask { private final String orderId; private final long timestamp; public DefaultTimerTask(String orderId) { this.orderId = orderId; this.timestamp = System.currentTimeMillis(); } @Override public void run(Timeout timeout) throws Exception { System.out.println(String.format("任务执行时间:%s,订单创建时间:%s,订单ID:%s", LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId)); } } } 运行结果:
一般来说,任务执行的时候应该使用另外的业务线程池,以免阻塞时间轮本身的运动。 选用的方案实现过程 最终选用了基于
对于第4点处理有两种方案:
最终暂时选用了方案一,也就是从 这里先详细说明一下用到的 Sorted Set相关命令
max:分数区间 - 最大分数。 min:分数区间 - 最小分数。 WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。 LIMIT:可选参数,offset和count原理和
Hash相关命令
Lua相关 加载 PS:如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。 引入依赖: <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.1.7.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>5.1.9.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.59</version> </dependency> </dependencies> 编写 -- /lua/enqueue.lua local zset_key = KEYS[1] local hash_key = KEYS[2] local zset_value = ARGV[1] local zset_score = ARGV[2] local hash_field = ARGV[3] local hash_value = ARGV[4] redis.call('ZADD', zset_key, zset_score, zset_value) redis.call('HSET', hash_key, hash_field, hash_value) return nil -- /lua/dequeue.lua -- 参考jesque的部分Lua脚本实现 local zset_key = KEYS[1] local hash_key = KEYS[2] local min_score = ARGV[1] local max_score = ARGV[2] local offset = ARGV[3] local limit = ARGV[4] -- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代 local status, type = next(redis.call('TYPE', zset_key)) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit) if list ~= nil and #list > 0 then -- unpack函数能把table转化为可变参数 redis.call('ZREM', zset_key, unpack(list)) local result = redis.call('HMGET', hash_key, unpack(list)) redis.call('HDEL', hash_key, unpack(list)) return result end end end return nil 编写核心API代码: // Jedis提供者 @Component public class JedisProvider implements InitializingBean { private JedisPool jedisPool; @Override public void afterPropertiesSet() throws Exception { jedisPool = new JedisPool(); } public Jedis provide(){ return jedisPool.getResource(); } } // OrderMessage @Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; } // 延迟队列接口 public interface OrderDelayQueue { void enqueue(OrderMessage message); List<OrderMessage> dequeue(String min, String max, String offset, String limit); List<OrderMessage> dequeue(); String enqueueSha(); String dequeueSha(); } // 延迟队列实现类 @RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { private static final String MIN_SCORE = "0"; private static final String OFFSET = "0"; private static final String LIMIT = "10"; private static final String ORDER_QUEUE = "ORDER_QUEUE"; private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE"; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua"; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua"; private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>(); private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>(); private static final List<String> KEYS = Lists.newArrayList(); private final JedisProvider jedisProvider; static { KEYS.add(ORDER_QUEUE); KEYS.add(ORDER_DETAIL_QUEUE); } @Override public void enqueue(OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis())); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } } @Override public List<OrderMessage> dequeue() { // 30分钟之前 String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue(String min, String max, String offset, String limit) { List<String> args = new ArrayList<>(); args.add(max); args.add(min); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); try (Jedis jedis = jedisProvider.provide()) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha() { return ENQUEUE_LUA_SHA.get(); } @Override public String dequeueSha() { return DEQUEUE_LUA_SHA.get(); } @Override public void afterPropertiesSet() throws Exception { // 加载Lua脚本 loadLuaScript(); } private void loadLuaScript() throws Exception { try (Jedis jedis = jedisProvider.provide()) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.compareAndSet(null, sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.compareAndSet(null, sha); } } public static void main(String[] as) throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); JedisProvider jedisProvider = new JedisProvider(); jedisProvider.afterPropertiesSet(); RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider); queue.afterPropertiesSet(); // 写入测试数据 OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(10086)); message.setOrderId("ORDER_ID_10086"); message.setUserId(10086L); message.setTimestamp(LocalDateTime.now().format(f)); List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); // 测试需要,score设置为30分钟之前 args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } List<OrderMessage> dequeue = queue.dequeue(); System.out.println(dequeue); } } 这里先执行一次 [OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)] 确定延迟队列的代码没有问题,接着编写一个 @DisallowConcurrentExecution @Component public class OrderMessageConsumer implements Job { private static final AtomicInteger COUNTER = new AtomicInteger(); private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { StopWatch stopWatch = new StopWatch(); stopWatch.start(); LOGGER.info("订单消息处理定时任务开始执行......"); List<OrderMessage> messages = orderDelayQueue.dequeue(); if (!messages.isEmpty()) { // 简单的列表等分放到线程池中执行 List<List<OrderMessage>> partition = Lists.partition(messages, 2); int size = partition.size(); final CountDownLatch latch = new CountDownLatch(size); for (List<OrderMessage> p : partition) { BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch)); } try { latch.await(); } catch (InterruptedException ignore) { //ignore } } stopWatch.stop(); LOGGER.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis()); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final List<OrderMessage> messages; private final CountDownLatch latch; @Override public void run() { try { // 实际上这里应该单条捕获异常 for (OrderMessage message : messages) { LOGGER.info("处理订单信息,内容:{}", message); } } finally { latch.countDown(); } } } } 上面的消费者设计的时候需要有以下考量:
其他 // Quartz配置类 @Configuration public class QuartzAutoConfiguration { @Bean public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setAutoStartup(true); factory.setJobFactory(quartzAutowiredJobFactory); return factory; } @Bean public QuartzAutowiredJobFactory quartzAutowiredJobFactory() { return new QuartzAutowiredJobFactory(); } public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory; } @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); // 这里利用AutowireCapableBeanFactory从新建的Job实例做一次自动装配,得到一个原型(prototype)的JobBean实例 autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } } } 这里暂时使用了内存态的 @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class}) public class Application implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Override public void run(String... args) throws Exception { // 准备一些测试数据 prepareOrderMessageData(); JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer", "DelayTask") .build(); // 触发器5秒触发一次 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger", "DelayTask") .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()) .build(); scheduler.scheduleJob(job, trigger); } private void prepareOrderMessageData() throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); try (Jedis jedis = jedisProvider.provide()) { List<OrderMessage> messages = Lists.newArrayList(); for (int i = 0; i < 100; i++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(i)); message.setOrderId("ORDER_ID_" + i); message.setUserId((long) i); message.setTimestamp(LocalDateTime.now().format(f)); messages.add(message); } // 这里暂时不使用Lua Map<String, Double> map = Maps.newHashMap(); Map<String, String> hash = Maps.newHashMap(); for (OrderMessage message : messages) { // 故意把score设计成30分钟前 map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000))); hash.put(message.getOrderId(), JSON.toJSONString(message)); } jedis.zadd("ORDER_QUEUE", map); jedis.hmset("ORDER_DETAIL_QUEUE", hash); } } } 输出结果如下:
首次执行的时候涉及到一些组件的初始化,会比较慢,后面看到由于我们只是简单打印订单信息,所以定时任务执行比较快。如果在不调整当前架构的情况下,生产中需要注意:
|
请发表评论