业务场景
商城订单生成30分钟后 如果未支付关闭订单
解决办法
可以使用延迟消息队列 这里我们用的是beanstalkd
Beanstalkd介绍
Beanstalk,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟。
Beanstalkd特性
1、支持优先级(支持任务插队) 2、延迟(实现定时任务) 3、持久化(定时把内存中的数据刷到binlog日志) 4、预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理) 5、任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)
任务job状态
delayed 延迟状态 ready 准备好状态 reserved 消费者把任务读出来,处理时 buried 预留状态 delete 删除状态
Beanstalkd安装
yum -y install beanstalkd
Beanstalkd守护进程启动
nohup beanstalkd -l 0.0.0.0 -p 11300 -b /home/software/binstalkd/binlogs &
1. -b表示开启binlog,断电后重启自动恢复任务
2. /home/software/binstalkd/binlogs是我们已经创建好的一个文件目录 这里面主要放一些持久化的日志
Pheanstalk安装
composer require pda/pheanstalk
PheanstalkGit仓库地址
连接Beanstalkd
<?php
require __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;
/**
* 实例化beanstalk
* 参数依次为:ip地址 端口号默认11300 连接超时时间 是否长连接
*/
$pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
?>
producer.php 生产者
<?php
// Hopefully you're using Composer autoloading.
require_once __DIR__.'/vendor/autoload.php';
use Pheanstalk\Pheanstalk;
/**
* 实例化beanstalk
* 参数依次为:ip地址 端口号默认11300 连接超时时间 是否长连接
*/
$pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
$arr['name'] = 'mark';
$jobData = json_encode($arr);
$pheanstalk ->useTube('tubeName') ->put($jobData,1024,5);
Beanstalkd生产者方法
指定需要使用的管道
$tube = $pheanstalk->useTube('default');
向管道插入数据
$tube = $pheanstalk->useTube('default');
$put = $tube->put(
'hello, beanstalk',
或者:
$pheanstalk->putInTube('default', 'test1', 1024, 10, 60);
---------------------------------------------------------------------------------------------------------------------+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
consumer.php 消费者
<?php
// Hopefully you're using Composer autoloading.
require_once __DIR__.'/vendor/autoload.php';
use Pheanstalk\Pheanstalk;
/**
* 实例化beanstalk
* 参数依次为:ip地址 端口号默认11300 连接超时时间 是否长连接
*/
$pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
//监听tubeName管道,忽略default管道
$job = $pheanstalk ->watch('tubeName') ->ignore('default') ->reserve();
echo $job->getData();
//删除当前任务
$pheanstalk->delete($job);
Beanstalkd消费者方法
监听管道
$tube = $pheanstalk->watch('user');
去除不需要监听的管道
$tube = $pheanstalk->watch('user')->ignore('default');
以堵塞的方式监听管道
$job = $pheanstalk->watch('user')->reserve(4);
列出所有已经监听的管道
$pheanstalk->listTubesWatched();
watch + reserve 方法
$pheanstalk->reserveFromTube('default')
删除当前任务
$job = $pheanstalk->watch('default')->reserve();
$pheanstalk->delete($job);
将当前任务重新放入管道
$job = $pheanstalk->watch('default')->reserve();
$pheanstalk->release($job);
为任务续命(当处理任务的时间小于当前任务执行时间时)
$job = $pheanstalk->watch('default')->reserve();
$pheanstalk->touch($job);
将任务预留
$job = $pheanstalk->watch('default')->reserve();
$pheanstalk->bury($job);
将预留任务释放(变为reday状态)
$job = $pheanstalk->peekBuried('default');
$pheanstalk->kickJob($job);
批量将预留任务释放
$pheanstalk->userTube('default')->kick(999);
读取当前准备就绪的任务(ready)
$job = $pheanstalk->peekReady('default');
读取当前处于延迟状态的任务(delayed)
$job = $pheanstalk->peekDelayed('default');
对管道设置延迟
$pheanstalk->pauseTube('default', 100);
取消对管道的延迟
$pheanstalk->resumeTube('default');
|
请发表评论