在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
背景知识:在学习RabbitMQ之前,我们需要对下面的知识有些概念, 生产者(producer):产生并发送消息的程序。 队列(queue):存在RabbitMQ中的邮筒,虽然消息是在应用程序和RabbitMQ中进行传递,但队列才是唯一能够存储消息的地方。队列的大小取决于宿主机器的内存和磁盘容量,它本质上是一个巨大的消息缓存池。多个生产者可以发送消息给同一个队列,多个消费者也可以从同一个队列中读取消息。这个队列有一个特点,先进先出。 消费者(consuming):等待接收消息的程序。
问题一:RabbitMQ如何解决生产者生产过快,消费者消费过慢的问题?在看这个问题之前,我们先看下这个问题:网络中,如果一个机器(producer)想把数据发送给另外一台机器(consumer),那么它应该怎么做? 答案是:它们之间需要建立一个连接,如下图所示,这样貌似就解决了生产者与消费者之间传递数据的问题。不过这样以来producer与consumer之间就绑定了,这个连接也要一直存在,要不然它们之间就没有办法通讯。 如果它们都很闲,或者它们的处理速度差不多(备注:生产者生产数据的速度和消费者消费数据的速度相当)的情况下,这都不是问题。 可是,一旦生产者生产数据过快,或者消费者消费数据过慢,这样就会出问题,生产者产生的数据没有办法被及时处理完。这样就会导致这些数据被丢弃掉,或者生产者只能暂时停止继续生产数据,但是生产者又被绑死在这个消费者上面,也没有办法去干别的事情。 要解决上面的问题,我们该怎么办呢?一般有两种办法: 方法一:新增消费者并让生产者与它再建立连接,然后生产者自己决策如何给这么多的消费者分配数据。这样的话会有两个结果: 第一,生产者需要与另外一个消费者再建立一条连接。第二,生产者需要自己添加数据分发策略,这样会导致生产者的逻辑变得复杂了很多。 方法二:将生产者产生的数据放到缓存中(也就是消息队列中),而消费者也从这个缓存中获取数据,如下图所示,这也是RabbitMQ的实现方式。这样的话,会有两个好处: 第一,生产者不需要与消费者绑定,它们只需要与消息队列绑定就好了,生产者和消费者成功完成解耦操作。第二,生产者和消费者的速度,可以不一致,就算生产者很快,消费者很慢也没有问题,只要它们能够保证消息队列不满的话,消费者就可以慢慢处理,生产者可以不停的去生产数据。 下面我们来看一下go-amqp例子,是如何实现的这一步操作: 左边是生产者的核心代码部分,右边是消费者的核心代码部分。 运行的时候,我们需要按照下面的步骤来操作: 首先,启动rabbitmq服务器 $ rabbitmq-server ........ Starting broker... completed with 6 plugins // 表示启动成功 其次,启动消费者 $ ./receive 最后,启动生产者,分别发送三次数据给rabbitmq-server $ ./send hello 2019/11/03 16:32:45 [x] Sent hello $ ./send world 2019/11/03 16:32:53 [x] Sent world $ ./send I love U 2019/11/03 16:33:13 [x] Sent I love U
问题二:RabbitMQ如何解决多个消费者调度的问题?当一个消费者怎么都处理不过来的时候,最终还是应该新增消费者来处理,如下图所示。在新增消费者的时候后,RabbitMQ的优势就体现出来了,新增消费者的时候,消费者只是与消息队列建立了新的连接,并且也不会增加生产者的代码复杂度。 不过这样也带来了一个新的问题:消息队列怎么决定,同一时刻哪一个消费者来消费这个消息? RabbitMQ最简单的方式就是时间轮询策略,也就是保证队列先进先出,本时刻哪一个消费者来消息数据,就给到哪一个消费者。 下面是多个消费者调度的展示例子, 我们启动两个消费者,一个生产者,如下图所示:消费者一:$ ./receive 消费者二:$ ./receive 生产者:
问题三:RabbitMQ如何保证消息队列中的数据,确实被消费者已经处理掉了?在真实的网络中,网络往往不可靠。也就是说有可能会存在消息被消费者拿走之后,因为网络原因导致消息并没有真正发送到消费者。 RabbitMQ采用的是消息确认机制,也就是消费者取走消息之后,在处理完了这个消息,需要要主动发送ACK给消息队列,消息队列在收到这个ACK之后,才可以删除这个消息。例子如下所示:
生产者发送的消息内容: 消费者消费了第一和第二次数据: 不过,这也要求消息队列必须在ACK回来的这段时间内保证不删除,可是如果ACK一直不来呢? 这样就会导致这个消息一直放在消息队列中不被处理,进而导致RabbitMQ上面的内存泄漏。 我们先来看下消息丢失的场景,一般有三种:第一种,消费者真的就没有回复;第二种,消费者回复了,但是网络原因给丢弃了;第三种,网络断开或者连接关掉。第一种和第三种最为常见,第二种,其实并不是很常见。 第一种情况,其实是消费者那端的代码问题,需要消费者修复才行。 第二种情况,往往是因为使用的底层通讯库有bug导致的,因为在连接不断开的前提下,只要消息发出去了,TCP协议会保证消息到达对端的。RabbitMQ并没有对这种场景做处理,因为RabbitMQ并不知道,这个消息是消费者丢失了,还是网络丢失了,当然了它也不应该关心这也业务场景。不过在设计的时候,我们到可以让消费者根据自身业务,添加超时处理机制,例如:消费者在长时间得不到RabbitMQ新的消息的时候,可以尝试去重发上一个消息的ACK消息。 第三种情况,RabbitMQ在监测到网络断开或者连接关掉的时候,会主动将这个消息再一次放回到消息队列里面,让后续消费者可以再取一次消息。 问题四:RabbitMQ如何保证消费者处理的公平性?上面讨论的消息内容都是相同或者相似大小的情况下,一旦者消息的大小不同,在RabbitMQ的轮询策略下,就很有可能导致大任务的消息被分配给同一个消费者,导致这个消费者很忙,而其他的消费者却比较闲。 基于这个问题,RabbitMQ采用公平策略做了处理,大体就是在消费者没有将分配到的消息处理完的时候,不在分配新的消息给他,这样就能够让闲一点的消费者去消息队列继续拿新的消息,而忙的消费者一心一意的处理拿到的这个大任务消息。例子如下所示:
消费者一,拿到了一个处理时间比较久的数据,所以一直在处理这个消息。 消费者二,拿到了比较短的数据,所以可以很快的处理完,便可以很快的分配到别的数据。 问题五:一旦RabbitMQ挂掉了,该怎么办呢?基于这个问题,RabbitMQ也做了处理,叫做消息持久化,在RabbitMQ挂掉之前的那些消息队列中的消息,它都会存到硬盘里面,等到RabbitMQ重启之后,会将这些数据重新恢复出来。 当然,对于生产者已经发送,却没有收到确认的消息,需要生产者单独做异常处理。
总结:本文只是对rabbitmq的基本使用,碰到的问题以及解决方法做了详解和举例说明,希望对你有所帮助。对于rabbitmq的路由部分,是另外一类内容,笔者会在后面一篇给出。 文章会在灰子学技术公众号首发,如果你觉得文章对你有帮助,还请关注“灰子学技术”。 灰子学技术: |
请发表评论