• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

imi-amqp: 支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ ...

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

imi-amqp

开源软件地址:

https://gitee.com/imiphp/imi-amqp

开源软件介绍:

imi-amqp

Latest VersionPhp VersionSwoole VersionIMI License

介绍

支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ

支持消息发布和消费

Composer

本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json 中加入下面的内容:

{    "require": {        "imiphp/imi-amqp": "^1.2.0"    }}

然后执行 composer update 安装。

使用说明

可以参考 example 目录示例,包括完整的消息发布和消费功能。

在项目 config/config.php 中配置:

[    'components'    =>  [        // 引入组件        'AMQP'   =>  'Imi\AMQP',    ],]

连接池配置:

[    'pools'    =>    [        'rabbit'    =>  [            'sync'    =>    [                'pool'    =>    [                    'class'        =>    \Imi\AMQP\Pool\AMQPSyncPool::class,                    'config'    =>    [                        'maxResources'    =>    10,                        'minResources'    =>    0,                    ],                ],                'resource'    =>    [                    'host'      => '127.0.0.1',                    'port'      => 5672,                    'user'      => 'guest',                    'password'  => 'guest',                ]            ],            'async'    =>    [                'pool'    =>    [                    'class'        =>    \Imi\AMQP\Pool\AMQPCoroutinePool::class,                    'config'    =>    [                        'maxResources'    =>    10,                        'minResources'    =>    1,                    ],                ],                'resource'    =>    [                    'host'      => '127.0.0.1',                    'port'      => 5672,                    'user'      => 'guest',                    'password'  => 'guest',                ]            ],        ],    ]]

默认连接池:

[    'beans' =>  [        'AMQP'  =>  [            'defaultPoolName'   =>  'rabbit',        ],    ],]

连接配置项

属性名称说明
host主机
port端口
user用户名
vhostvhost,默认/
insistinsist
loginMethod默认AMQPLAIN
loginResponseloginResponse
locale默认en_US
connectionTimeout连接超时
readWriteTimeout读写超时
keepalivekeepalive,默认false
heartbeat心跳时间,默认0
channelRpcTimeout频道 RPC 超时时间,默认0.0
sslProtocolssl 协议,默认null

消息定义

继承 Imi\AMQP\Message 类,可在构造方法中对属性修改。

根据需要可以覆盖实现setBodyDatagetBodyData方法,实现自定义的消息结构。

<?phpnamespace ImiApp\AMQP\Test2;use Imi\AMQP\Message;class TestMessage2 extends Message{    /**     * 用户ID     *     * @var int     */    private $memberId;    /**     * 内容     *     * @var string     */    private $content;    public function __construct()    {        parent::__construct();        $this->routingKey = 'imi-2';        $this->format = \Imi\Util\Format\Json::class;    }    /**     * 设置主体数据     *     * @param mixed $data     * @return self     */    public function setBodyData($data)    {        foreach($data as $k => $v)        {            $this->$k = $v;        }    }    /**     * 获取主体数据     *     * @return mixed     */    public function getBodyData()    {        return [            'memberId'  =>  $this->memberId,            'content'   =>  $this->content,        ];    }    /**     * Get 用户ID     *     * @return int     */     public function getMemberId()    {        return $this->memberId;    }    /**     * Set 用户ID     *     * @param int $memberId  用户ID     *     * @return self     */     public function setMemberId(int $memberId)    {        $this->memberId = $memberId;        return $this;    }    /**     * Get 内容     *     * @return string     */     public function getContent()    {        return $this->content;    }    /**     * Set 内容     *     * @param string $content  内容     *     * @return self     */     public function setContent(string $content)    {        $this->content = $content;        return $this;    }}

属性列表:

名称说明默认值
bodyData消息主体内容,非字符串null
properties属性['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,]
routingKey路由键空字符串
format如果设置了,发布的消息是编码后的bodyData,同理读取时也会解码。实现了Imi\Util\Format\IFormat的格式化类。支持JsonPhpSerializenull
mandatorymandatory标志位false
immediateimmediate标志位false
ticketticketnull

发布者

必选注解:@Publisher

可选注解:@Queue@Exchange@Connection

不配置 @Connection 注解,可以从连接池中获取连接

<?phpnamespace ImiApp\AMQP\Test;use Imi\Bean\Annotation\Bean;use Imi\AMQP\Annotation\Queue;use Imi\AMQP\Base\BasePublisher;use Imi\AMQP\Annotation\Consumer;use Imi\AMQP\Annotation\Exchange;use Imi\AMQP\Annotation\Publisher;use Imi\AMQP\Annotation\Connection;/** * @Bean("TestPublisher") * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest") * @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1") * @Queue(name="queue-imi-1", routingKey="imi-1") * @Exchange(name="exchange-imi") */class TestPublisher extends BasePublisher{}

消费者

必选注解:@Consumer

可选注解:@Queue@Exchange@Connection

不配置 @Connection 注解,可以从连接池中获取连接

<?phpnamespace ImiApp\AMQP\Test;use Imi\Redis\Redis;use Imi\Bean\Annotation\Bean;use Imi\AMQP\Annotation\Queue;use Imi\AMQP\Base\BaseConsumer;use Imi\AMQP\Contract\IMessage;use Imi\AMQP\Annotation\Consumer;use Imi\AMQP\Annotation\Exchange;use Imi\AMQP\Enum\ConsumerResult;use Imi\AMQP\Annotation\Connection;/** * 启动一个新连接消费 *  * @Bean("TestConsumer") * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest") * @Consumer(tag="tag-imi", queue="queue-imi-1", message=\ImiApp\AMQP\Test\TestMessage::class) */class TestConsumer extends BaseConsumer{    /**     * 消费任务     *     * @param \ImiApp\AMQP\Test\TestMessage $message     * @return void     */    protected function consume(IMessage $message)    {        var_dump(__CLASS__, $message->getBody(), get_class($message));        Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody());        return ConsumerResult::ACK;    }}

注解说明

@Publisher

发布者注解

属性名称说明
queue队列名称
exchange交换机名称
routingKey路由键

@Consumer

消费者注解

属性名称说明
tag消费者标签
queue队列名称
exchange交换机名称
routingKey路由键
message消息类名,默认:Imi\AMQP\Message
mandatorymandatory标志位
immediateimmediate标志位
ticketticket

@Queue

队列注解

属性名称说明
name队列名称
routingKey路由键
passive被动模式,默认false
durable消息队列持久化,默认true
exclusive独占,默认false
autoDelete自动删除,默认false
nowait是否非阻塞,默认false
arguments参数
ticketticket

@Exchange

交换机注解

属性名称说明
name交换机名称
type类型可选:directfanouttopicheaders
passive被动模式,默认false
durable消息队列持久化,默认true
autoDelete自动删除,默认false
internal设置是否为rabbitmq内部使用, true表示是内部使用, false表示不是内部使用
nowait是否非阻塞,默认false
arguments参数
ticketticket

@Connection

连接注解

属性名称说明
poolName不为 null 时,无视其他属性,直接用该连接池配置。默认为null,如果hostportuserpassword都未设置,则获取默认的连接池。
host主机
port端口
user用户名
vhostvhost,默认/
insistinsist
loginMethod默认AMQPLAIN
loginResponseloginResponse
locale默认en_US
connectionTimeout连接超时
readWriteTimeout读写超时
keepalivekeepalive,默认false
heartbeat心跳时间,默认0
channelRpcTimeout频道 RPC 超时时间,默认0.0
sslProtocolssl 协议,默认null

队列组件支持

本组件额外实现了 imiphp/imi-queue 的接口,可以用 Queue 组件的 API 进行调用。

只需要将队列驱动配置为:AMQPQueueDriver

配置示例:

[    'components'    =>  [        'AMQP'  =>  'Imi\AMQP',    ],    'beans' =>  [        'AutoRunProcessManager' =>  [            'processes' =>  [                // 加入队列消费进程,非必须,你也可以自己写进程消费                'QueueConsumer',            ],        ],        'imiQueue'  =>  [            // 默认队列            'default'   =>  'test1',            // 队列列表            'list'  =>  [                // 队列名称                'test1' =>  [                    // 使用的队列驱动                    'driver'        =>  'AMQPQueueDriver',                    // 消费协程数量                    'co'            =>  1,                    // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准                    'process'       =>  1,                    // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)                    'timespan'      =>  0.1,                    // 进程分组名称                    'processGroup'  =>  'a',                    // 自动消费                    'autoConsumer'  =>  true,                    // 消费者类                    'consumer'      =>  'AConsumer',                    // 驱动类所需要的参数数组                    'config'        =>  [                        // AMQP 连接池名称                        'poolName'      =>  'amqp',                        // Redis 连接池名称                        'redisPoolName;'=>  'redis',                        // Redis 键名前缀                        'redisPrefix'   =>  'test1:',                        // 可选配置:                        // 支持消息删除功能,依赖 Redis                        'supportDelete' =>  true,                        // 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列                        'supportTimeout' =>  true,                        // 支持消费失败队列功能,自动增加一个队列                        'supportFail' =>  true,                        // 循环尝试 pop 的时间间隔,单位:秒                        'timespan'  =>  0.03,                        // 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。                        'queueLength'   =>  16,                        // 消息类名                        'message'   =>  \Imi\AMQP\Queue\JsonAMQPMessage::class,                    ]                ],            ],        ],    ]]

消费者类写法,与imi-queue组件用法一致。

免费技术支持

QQ群:17916227 点击加群,如有问题会有人解答和修复。

运行环境

版权信息

imi-amqp 遵循 MIT 开源协议发布,并提供免费使用。

捐赠

开源不求盈利,多少都是心意,生活不易,随缘随缘……


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap