- Published on
RabbitMQ核心机制全解:交换机类型、消息保障与积压处理实战
- Authors
- Name
- Liant
rabbitmq基础篇
rabbitmq思维导图

PHP客户端使用 php-amqplib、amqp扩展
rabbitmq交换机类型
默认交换机
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。
它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
举个栗子:当你声明了一个名为"search-indexing-online"的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为"search-indexing-online"。因此,当携带着名为"search-indexing-online"的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为"search-indexing-online"的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作
直连交换机
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:
将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。
直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

扇型交换机
扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:
大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
分发系统使用它来广播各种状态和配置更新
在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)

主题交换机
主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。
使用案例:
分发有关于特定地理位置的数据,例如销售点
由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
股票价格更新(以及其他类型的金融数据更新)
涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
云端的不同种类服务的协调
分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。
头交换机
有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。
头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。
Features功能
Features 功能
D:d 是 durable 的缩写,代表这个队列中的消息支持持久化。 AD:ad 是 autoDelete 的缩写。代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除。
excl:是 exclusive 的缩写。代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
Args:是 arguments 的缩写。代表该队列配置了 arguments 参数。
TTL:是 x-message-ttl 的缩写。设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒。
Exp:Auto Expire,是 x-expires 配置的缩写。当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp。注意这里是删除队列,不是队列中的消息。
Lim:说明该队列配置了 x-max-length。限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉。
Lim B:说明队列配置了 x-max-length-bytes。限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小。
DLX:说明该队列配置了 x-dead-letter-exchange。当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉。
DLK:x-dead-letter-routing-key 的缩写,将删除的消息推送到指定交换机的指定路由键的队列中去。
Pri:x-max-priority 的缩写,优先级队列。表明该队列支持优先级,先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。
Ovfl:x-overflow 的缩写。队列中的消息溢出时,如何处理这些消息。要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息。有两个配置项:drop-head,代表丢弃队列头部的消息,默认行为;reject-publish 设置队列中的消息溢出后,该队列的行为:”拒绝接收”(所有消息)。
ha-all:镜像队列。all 表示镜像到集群上的所有节点,ha-params 参数忽略
应答方式
basicAck:成功消费,消息从队列中删除 basicNack:拒绝消息,requeue=true,消息重新进入队列,false被删除 basicReject:等同于basicNack basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer
basicQos:每次发送数量
消息回复:
Ack:(确认)收到一个或者多个消息
消费者确认收到一个或者多个消息
deliveryTag:服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息的唯一标识,是一个递增的正整数
multiple:true表示确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false只确认deliveryTag指定的消息
basicRecover:重新发送
要求代理重新发送未确认的消息
requeue:如果为true,消息将会重新入队,可能会被发送给其它的消费者;如果为false,消息将会发送给相同的消费者
basicNack:拒绝消息
拒绝接收到的一个或者多个消息
deliveryTag:接收到消息的唯一标识
multiple: true表示拒绝所有的消息,包括提供的deliveryTag;false表示仅拒绝提供的deliveryTag
requeue:true 表示拒绝的消息应重新入队,而不是否丢弃 */
basicReject:拒绝消息
消息相关问题
消费时消息重复
生产时消息重复
消费时消息重复(保证消息幂等性)
在消费者方面如果出现网络问题,比如消费者对消息已经成功消费了,在向MQ服务器进行确认的时候网络异常了,这时候MQ服务器就没有接收到确认,MQ为了保证消息被消费,就会继续向消费者发送之前已经被消费了的消息,这种情况下消费者就会接收到两条一样的消息
消息丢失
生产端丢失 AMQP协议提供的一个事务机制(这是同步操作,一条消息发送之后会使发送端阻塞,生产者生产消息的吞吐量和性能都会大大降低)
首先生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一deliveryTag和multiple参数),这就使得生产者知晓消息已经正确到达了目的地了。
Confirm模式有三种方式实现:
- 串行confirm模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传。
- 批量confirm模式:producer每发送一批消息后,调用waitForConfirms()方法,等待broker端confirm。
- 异步confirm模式:提供一个回调方法,broker confirm了一条或者多条消息后producer端会回调这个方法。 我们分别来看看这三种confirm模式
发送到生产端 但没有匹配队列 消息丢失
- 1). 使用mandatory 设置true
- 2). 利用备份交换机(alternate-exchange):实现没有路由到队列的消息
消息积压
消费者消费消息的速度赶不上生产速度,这总问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需要改业务流程或逻辑已保证消费度跟上生产消息的速,譬如增加消费者的数量等。
消费者出现异常,导致一直无法接收新的消息,这种问题需要排查消费的逻辑是不是又问题,需要优化程序。
除了上面的者两种问题,还有一些其他情况会导致消息积压,譬如一些系统是无法预计成产消息的速度和频率,又或者消费者的速度已经被限制,不能通过加新的消费者来解决,譬如不同的系统间的API对接,对接那一方就做了请求频率的限制,或者对方系统承受不了太大的并发,还有一些系统如果是面对企业客户,譬如电商,物流,仓储等类似平台系统的客户的下单是没有规律的或者集中某一个时间段下单的,这种就不能简单的通过加消费者来解决,就需要分析具体业务来避免消息积压。
使用Redis的消息队列,设置消息过期时间
设置并发消费两个关键属性concurrentConsumers和prefetchCount