在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
RabbitMQ的安装过程,工作流程,和一些基础概念已经在前面的笔记中提到了,今天在本地实现了php连接RabbitMQ,以及消息的生产和消费的过程,首先看下没有生产者和消费者的默认RabbitMQ管理界面截图: 还没有任何连接(Connections) 还没有任何通道(Channels)
还没有任何队列 <?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/15 * Time: 13:16 */ //声明连接参数 $config = array( 'host' => '192.168.31.247', 'vhost' => 'my_vhost', 'port' => 5672, 'login' => 'admin', 'password' => 'admin' ); date_default_timezone_set("Asia/Shanghai"); //连接broker $cnn = new AMQPConnection($config); if (!$cnn->connect()) { echo "Cannot connect to the broker"; exit(); } //在连接内创建一个通道 $ch = new AMQPChannel($cnn); //创建一个交换机 $ex = new AMQPExchange($ch); //声明路由键 $routingKey = 'key_1'; //声明交换机名称 $exchangeName = 'exchange_1'; //设置交换机名称 $ex->setName($exchangeName); //设置交换机类型 //AMQP_EX_TYPE_DIRECT:直连交换机 //AMQP_EX_TYPE_FANOUT:扇形交换机 //AMQP_EX_TYPE_HEADERS:头交换机 //AMQP_EX_TYPE_TOPIC:主题交换机 $ex->setType(AMQP_EX_TYPE_DIRECT); //设置交换机持久 $ex->setFlags(AMQP_DURABLE); //声明交换机 $ex->declareExchange(); //创建一个消息队列 $q = new AMQPQueue($ch); //设置队列名称 $q->setName('queue_1'); //设置队列持久 $q->setFlags(AMQP_DURABLE); //声明消息队列 $q->declareQueue(); //交换机和队列通过$routingKey进行绑定 $q->bind($ex->getName(), $routingKey); //接收消息并进行处理的回调方法 function receive($envelope, $queue) { //休眠两秒, sleep(2); //echo消息内容 echo $envelope->getBody()."\n"; //显式确认,队列收到消费者显式确认后,会删除该消息 $queue->ack($envelope->getDeliveryTag()); } //设置消息队列消费者回调方法,并进行阻塞 $q->consume("receive"); //$q->consume("receive", AMQP_AUTOACK);//隐式确认,不推荐 以上是消费者代码,打开两个命令行/终端 此时再看RabbitMQ管理界面: 出现两个连接,这两个就是消费者,因为他们在阻塞着等待消息 消费者在各自的连接里都打开了一个通道 其中一个消费者创建了一个持久的直连交换机 消息队列已经创建,但消息数是0,因为此时还没有生产者 <?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/15 * Time: 13:15 */ $config = array( 'host' => '192.168.31.247', 'vhost' => 'my_vhost', 'port' => 5672, 'login' => 'admin', 'password' => 'admin' ); $cnn = new AMQPConnection($config); if (!$cnn->connect()) { echo "Cannot connect to the broker"; exit(); } $ch = new AMQPChannel($cnn); $ex = new AMQPExchange($ch); //消息的路由键,一定要和消费者端一致 $routingKey = 'key_1'; //交换机名称,一定要和消费者端一致, $exchangeName = 'exchange_1'; $ex->setName($exchangeName); $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE); $ex->declareExchange(); //创建10个消息 for ($i=1;$i<=10;$i++){ //消息内容 $msg = array( 'data' => 'message_'.$i, 'hello' => 'world', ); //发送消息到交换机,并返回发送结果 //delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失 echo "Send Message:".$ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n"; //代码执行完毕后进程会自动退出 } 以上是生产者代码 因为我们执行生产者之前已经关掉了全部消费者,所以此时消息在队列中等待获取; 提醒:生产者在生产消息时,如果不存在指定队列,并且没有创建队列,或者队列存在但消息路由键和交换机与队列绑定的键(路由规则)不一致(直连交换机必须一致),则消息会被交换机丢弃。 |
2022-08-30
2022-08-17
2022-11-06
2022-08-17
2022-07-18
请发表评论