在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
1. RabbitMQ是什么?
生活场景: 1.其实我们在双11的时候,当我们凌晨大量的秒杀和抢购商品,然后去结算的时候,就会发现,界面会提醒我们,让我们稍等,以及一些友好的图片文字提醒。而不是像前几年的时代,动不动就页面卡死,报错等来呈现给用户。 在这业务场景中,我们就可以采用队列的机制来处理,因为同时结算就只能达到这么多。 2.在我们平时的超市中购物也是一样,当我们在结算的时候,并不会一窝蜂一样涌入收银台,而是排队结算。这也是队列机制。 2. RabbitMQ简介AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。
2.1 定义和特征
2.2 安装
安装erlang # centos7 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.17/rabbitmq-server-3.7.17-1.el7.noarch.rpm yum install epel-release yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl rpm -ivh esl-erlang_22.0.7-1~centos~7_amd64.rpm 安装rabbitmq wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-3.8.0-1.el7.noarch.rpm yum -y install socat rpm -ivh rabbitmq-server-3.8.0-1.el7.noarch.rpm 启动 chkconfig rabbitmq-server on # 开机启动 systemctl start rabbitmq-server.service # 启动 systemctl stop rabbitmq-server.service # 停止 systemctl restart rabbitmq-server.service # 重启 rabbitmqctl status # 查看状态 rabbitmq-plugins enable rabbitmq_management # 启动Web管理器
修改配置 vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/ebin/rabbit.app 将:{loopback_users, [<<”guest”>>]}, 改为:{loopback_users, []}, 原因:rabbitmq从3.3.0开始禁止使用guest/guest权限通过 除localhost外的访问 systemctl restart rabbitmq-server.service # 重启服务 3. RabbitMQ核心概念
AMQP消息路由
AMQP中消息的路由过程和JMS存在一些差别。AMQP中增加了Exchange和Binging的角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到哪个队列。
Exchange分发消息时,根据类型的不同分发策略有区别。目前共四种类型:direct、fanout、topic、headers(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,目前几乎用不到了。所以直接看另外三种类型。)。 direct消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配。 fanout每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理该路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。 topic4. RabbitMQ的运行模式
创建实例
package RabbitMQ import ( "fmt" "github.com/streadway/amqp" "log" ) //url格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost const MQURL = "amqp://zhangyafei:[email protected]:5672/imooc" type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel // 队列名称 QueueName string //交换机 Exchange string //key Key string // 连接信息 Mqurl string } //创建结构体实例 func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ { rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL} var err error // 创建rabbitmq连接 rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "创建连接错误!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "获取channel失败!") return rabbitmq } //断开channel和connection func (r *RabbitMQ) Destory() { r.channel.Close() r.conn.Close() } //错误处理函数 func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { log.Fatalf("%s:%s", message, err) panic(fmt.Sprintf("%s:%s", message, err)) } } // 简单模式step1: 1.创建简单模式下的rabbitmq实例 func NewRabbitMQSimple(queueName string) *RabbitMQ { return NewRabbitMQ(queueName, "", "") } // 简单模式step2: 2.简单模式下生产 func (r *RabbitMQ) PublishSimple(message string) { // 1. 申请队列,如果队列不存在则自动创建,如果存在则跳过创建 // 保证队列存在,消息能发送到队列中 _, err := r.channel.QueueDeclare( r.QueueName, // 是否持久化 false, // 是否为自动删除 false, // 是否具有排他性 false, // 是否阻塞 false, // 额外属性 nil, ) if err != nil { fmt.Println(err) } // 2. 发送消息到队列中 r.channel.Publish( r.Exchange, r.QueueName, // 如果为true,根据exchange类型和routekey规则,如果无法找到符合条件的队列,则会把发送的消息返回给发送者 false, // 如果为true,当exchange发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者 false, amqp.Publishing{ContentType: "text/plain", Body: []byte(message)}, ) } // 简单模式step3: 3.简单模式下消费 func (r *RabbitMQ) ConsumeSimple() { // 1. 申请队列,如果队列不存在则自动创建,如果存在则跳过创建 // 保证队列存在,消息能发送到队列中 _, err := r.channel.QueueDeclare( r.QueueName, // 是否持久化 false, // 是否为自动删除 false, // 是否具有排他性 false, // 是否阻塞 false, // 额外属性 nil, ) if err != nil { fmt.Println(err) } // 2. 接收消息 msgs, err := r.channel.Consume( r.QueueName, // 用来区分多个消费者 "", // 是否自动应答 true, // 是否具有排他性 false, // 如果为true,表示不能将同一个conn中的消息发送给这个conn中的消费者 false, // 队列是否阻塞 false, nil, ) if err != nil { fmt.Println(err) } forever := make(chan bool) // 3. 启用协程处理消息 go func() { for d := range msgs { // 实现我们要处理的逻辑函数 log.Printf("Received a message: %s", d.Body) } }() log.Printf("[*] waiting for messages, to exit process CTRL+C") <-forever } 生产者
package main import ( "RabbitMQ/RabbitMQ/RabbitMQ" "fmt" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple") rabbitmq.PublishSimple("hello imooc!") fmt.Println("发送成功") } 消费者 package main import ( "RabbitMQ/RabbitMQ/RabbitMQ" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple") rabbitmq.ConsumeSimple() }
生产者
package main import ( "RabbitMQ/RabbitMQ/RabbitMQ" "fmt" "strconv" "time" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple") for i := 0; i<= 100; i++ { rabbitmq.PublishSimple("hello imooc!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } } 消费者 package main import ( "RabbitMQ/RabbitMQ/RabbitMQ" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple") rabbitmq.ConsumeSimple() }
创建实例
// 订阅模式下创建RabbitMQ实例 func NewRabbitMQPubSub(exchangeName string) *RabbitMQ { rabbitmq := NewRabbitMQ("", exchangeName, "") var err error // 获取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connect rabbitmq!") // 获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } // 订阅模式下生产 func (r *RabbitMQ) PublishPub(message string) { // 1. 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, "fanout", // 广播类型 true, // 持久化 false, // 是否删除 false, // true表示这个exchange不可以被client用来推送消息的,仅用来进行exchange和exchange之间的绑定 false, nil, ) r.failOnErr(err, "Failed to declare a exchange") // 2. 发送消息 err = r.channel.Publish( r.Exchange, "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } // 订阅模式消费端的代码 func (r *RabbitMQ) ReceiveSub() { // 1. 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, "fanout", // 广播类型 true, // 持久化 false, // 是否删除 false, // true表示这个exchange不可以被client用来推送消息的,仅用来进行exchange和exchange之间的绑定 false, nil, ) r.failOnErr(err, "Failed to declare a exchange") // 2. 试探性创建队列 q, err := r.channel.QueueDeclare( "", // 随机生产队列名称 false, false, true, false, nil, ) r.failOnErr(err, "failed to declare a queue") // 绑定队列到 exchange中 err = r.channel.QueueBind( q.Name, "", // 在订阅模式下,这里的key为空 r.Exchange, false, nil) // 消费消息 messages, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) if err != nil { fmt.Println(err) } forever := make(chan bool) // 3. 启用协程处理消息 go func() { for d := range messages { // 实现我们要处理的逻辑函数 log.Printf("Received a message: %s", d.Body) } }() log.Printf("[*] waiting for messages, to exit process CTRL+C") <-forever } 生产者
package main import ( "RabbitMQ/RabbitMQ/RabbitMQ" "fmt" "strconv" "time" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQPubSub("NewProduct") for i := 0; i <= 100; i++ { rabbitmq.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据") fmt.Println("订阅模式生产第" + strconv.Itoa(i) + "条数据") time.Sleep(1 * time.Second) } } 消费者
package main import ( "RabbitMQ/RabbitMQ/RabbitMQ" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQPubSub("NewProduct") rabbitmq.ReceiveSub() }
创建实例、
// 路由模式下创建RabbitMQ实例 func NewRabbitMQRouting(exchangeName string, routingkey string) *RabbitMQ { // 创建RabbitMQ实例 rabbitmq := NewRabbitMQ("", exchangeName, routingkey) var err error // 获取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connect rabbitmq!") // 获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } // 路由模式发送消息 func (r *RabbitMQ) PublishRouting(message string) { // 1. 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, "direct", // 定向类型 true, // 持久化 false, // 是否删除 false, // true表示这个exchange不可以被client用来推送消息的,仅用来进行exchange和exchange之间的绑定 false, nil, ) r.failOnErr(err, "Failed to declare a exchange") // 2. 发送消息 err = r.channel.Publish( r.Exchange, r.Key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } // 路由模式消费端的代码 func (r *RabbitMQ) ReceiveRouting() { // 1. 试探性的创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, "direct", // 广播类型 true, // 持久化 false, // 是否删除 false, // true表示这个exchange不可以被client用来推送消息的,仅用来进行exchange和exchange之间的绑定 false, nil, ) r.failOnErr(err, "Failed to declare a exchange") // 2. 试探性创建队列 q, err := r.channel.QueueDeclare( "", // 随机生产队列名称 false, false, true, false, nil, ) r.failOnErr(err, "failed to declare a queue") // 绑定队列到 exchange中 err = r.channel.QueueBind( q.Name, r.Key, // 在订阅模式下,这里的key为空 r.Exchange, false, nil) // 消费消息 messages, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) if err != nil { fmt.Println(err) } forever := make(chan bool) // 3. 启用协程处理消息 go func() { for d := range messages { // 实现我们要处理的逻辑函数 log.Printf("Received a message: %s", d.Body) } }() log.Printf("[*] waiting for messages, to exit process CTRL+C") <-forever } 生产者
package main import ( "RabbitMQ/RabbitMQ/RabbitMQ" "fmt" "strconv" "time" ) func main() { rabbit_imooc_one := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one") rabbit_imooc_two := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two") for i := 0; i <= 100; i++ { rabbit_imooc_one.PublishRouting("Hello imooc one!" + strconv.Itoa(i)) rabbit_imooc_two.PublishRouting("Hello imooc two!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } } 消费者1 package main import "RabbitMQ/RabbitMQ/RabbitMQ" func main() { rabbitmq_imooc_one := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one") rabbitmq_imooc_one.ReceiveRouting() } 消费者2 package main import "RabbitMQ/RabbitMQ/RabbitMQ" func main() { rabbitmq_imooc_two := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two") rabbitmq_imooc_two.ReceiveRouting() } |
请发表评论