在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:imi-amqp开源软件地址:https://gitee.com/imiphp/imi-amqp开源软件介绍:imi-amqp介绍支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ 支持消息发布和消费 Composer本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 { "require": { "imiphp/imi-amqp": "^1.2.0" }} 然后执行 使用说明可以参考 在项目 [ 'components' => [ // 引入组件 'AMQP' => 'Imi\AMQP', ],] 连接池配置: [ 'pools' => [ 'rabbit' => [ 'sync' => [ 'pool' => [ 'class' => \Imi\AMQP\Pool\AMQPSyncPool::class, 'config' => [ 'maxResources' => 10, 'minResources' => 0, ], ], 'resource' => [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', ] ], 'async' => [ 'pool' => [ 'class' => \Imi\AMQP\Pool\AMQPCoroutinePool::class, 'config' => [ 'maxResources' => 10, 'minResources' => 1, ], ], 'resource' => [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', ] ], ], ]] 默认连接池: [ 'beans' => [ 'AMQP' => [ 'defaultPoolName' => 'rabbit', ], ],] 连接配置项
消息定义继承 根据需要可以覆盖实现 <?phpnamespace ImiApp\AMQP\Test2;use Imi\AMQP\Message;class TestMessage2 extends Message{ /** * 用户ID * * @var int */ private $memberId; /** * 内容 * * @var string */ private $content; public function __construct() { parent::__construct(); $this->routingKey = 'imi-2'; $this->format = \Imi\Util\Format\Json::class; } /** * 设置主体数据 * * @param mixed $data * @return self */ public function setBodyData($data) { foreach($data as $k => $v) { $this->$k = $v; } } /** * 获取主体数据 * * @return mixed */ public function getBodyData() { return [ 'memberId' => $this->memberId, 'content' => $this->content, ]; } /** * Get 用户ID * * @return int */ public function getMemberId() { return $this->memberId; } /** * Set 用户ID * * @param int $memberId 用户ID * * @return self */ public function setMemberId(int $memberId) { $this->memberId = $memberId; return $this; } /** * Get 内容 * * @return string */ public function getContent() { return $this->content; } /** * Set 内容 * * @param string $content 内容 * * @return self */ public function setContent(string $content) { $this->content = $content; return $this; }} 属性列表:
发布者必选注解: 可选注解: 不配置 <?phpnamespace ImiApp\AMQP\Test;use Imi\Bean\Annotation\Bean;use Imi\AMQP\Annotation\Queue;use Imi\AMQP\Base\BasePublisher;use Imi\AMQP\Annotation\Consumer;use Imi\AMQP\Annotation\Exchange;use Imi\AMQP\Annotation\Publisher;use Imi\AMQP\Annotation\Connection;/** * @Bean("TestPublisher") * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest") * @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1") * @Queue(name="queue-imi-1", routingKey="imi-1") * @Exchange(name="exchange-imi") */class TestPublisher extends BasePublisher{} 消费者必选注解: 可选注解: 不配置 <?phpnamespace ImiApp\AMQP\Test;use Imi\Redis\Redis;use Imi\Bean\Annotation\Bean;use Imi\AMQP\Annotation\Queue;use Imi\AMQP\Base\BaseConsumer;use Imi\AMQP\Contract\IMessage;use Imi\AMQP\Annotation\Consumer;use Imi\AMQP\Annotation\Exchange;use Imi\AMQP\Enum\ConsumerResult;use Imi\AMQP\Annotation\Connection;/** * 启动一个新连接消费 * * @Bean("TestConsumer") * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest") * @Consumer(tag="tag-imi", queue="queue-imi-1", message=\ImiApp\AMQP\Test\TestMessage::class) */class TestConsumer extends BaseConsumer{ /** * 消费任务 * * @param \ImiApp\AMQP\Test\TestMessage $message * @return void */ protected function consume(IMessage $message) { var_dump(__CLASS__, $message->getBody(), get_class($message)); Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody()); return ConsumerResult::ACK; }} 注解说明@Publisher发布者注解
@Consumer消费者注解
@Queue队列注解
@Exchange交换机注解
@Connection连接注解
队列组件支持本组件额外实现了 imiphp/imi-queue 的接口,可以用 Queue 组件的 API 进行调用。 只需要将队列驱动配置为: 配置示例: [ 'components' => [ 'AMQP' => 'Imi\AMQP', ], 'beans' => [ 'AutoRunProcessManager' => [ 'processes' => [ // 加入队列消费进程,非必须,你也可以自己写进程消费 'QueueConsumer', ], ], 'imiQueue' => [ // 默认队列 'default' => 'test1', // 队列列表 'list' => [ // 队列名称 'test1' => [ // 使用的队列驱动 'driver' => 'AMQPQueueDriver', // 消费协程数量 'co' => 1, // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准 'process' => 1, // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效) 'timespan' => 0.1, // 进程分组名称 'processGroup' => 'a', // 自动消费 'autoConsumer' => true, // 消费者类 'consumer' => 'AConsumer', // 驱动类所需要的参数数组 'config' => [ // AMQP 连接池名称 'poolName' => 'amqp', // Redis 连接池名称 'redisPoolName;'=> 'redis', // Redis 键名前缀 'redisPrefix' => 'test1:', // 可选配置: // 支持消息删除功能,依赖 Redis 'supportDelete' => true, // 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列 'supportTimeout' => true, // 支持消费失败队列功能,自动增加一个队列 'supportFail' => true, // 循环尝试 pop 的时间间隔,单位:秒 'timespan' => 0.03, // 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。 'queueLength' => 16, // 消息类名 'message' => \Imi\AMQP\Queue\JsonAMQPMessage::class, ] ], ], ], ]] 消费者类写法,与 免费技术支持运行环境版权信息
捐赠开源不求盈利,多少都是心意,生活不易,随缘随缘…… |
请发表评论