MQ消息可靠性
MQ消息可靠性
是什么
首先我们要知道。MQ是消息中间件,MQ的消息的发送是异步的。其主要步骤是:
生产者 -> 交换机 -> 队列 -> 消费者
其中有3个传输数据的地方,任意一个步骤出错都有可能导致数据丢失。那么我们怎么保证消息的可靠性呢?
为什么
其实消息可靠性是根据业务场景来进行要求的
- 金融支付:丢消息就是丢钱
- 订单系统:丢消息就是损失收入
- 日志采集:可允许少量丢失,保证速度快
- 实时监控:允许部分丢失,保证延迟低
怎么做
第一步:生产者确认机制(Publisher Confirms)
首先我们保证消息成功到达Broker(RabbitMQ服务器)
// 1. 开启生产者确认 |
补充:
- deliveryMode(),1表示非持久化消息,2表示持久化消息
- 在这里我们使用的是同步确认
- channel.basicPublish(“my_exchange”, “my_routing_key”, properties, messageBodyBytes);
关于这四个参数:- my_exchange:交换机名称
- my_routing_key:路由键
- properties:消息属性,它主要是包含消息元数据的对象,定义了消息本身的一些属性和行为,常见属性如下:
- Priority:优先级,数字越小优先级越高
- delivery_mode:消息持久化,1 内存,2 内存+磁盘
- expiration: 延迟消息:毫秒为单位
- messageBodyBytes:消息内容,以字节数组(byte[])的形式传递,生产者负责序列化,消费者负责反序列化
注:
properties:使用 AMQP.BasicProperties.Builder()来构建这个对象
第二步:消息持久化
消息持久化的目的是为了保证Broker重启后,消息不丢失
持久化有三点:
- 交换机持久化(durable=true)
- 队列持久化(durable=true)
- 消息持久化(delivery_mode=2)
第三步:交换机-队列环节丢失风险
其实交换机到队列也会存在消息丢失的风险,具体场景如下:
- 路由键的错误绑定,消息被交换机直接丢弃
- 队列达到最大容量,新消息被丢弃或顶掉旧消息(取决于配置)
- Broker(RabbitMQ)出现问题,消息丢失
对应解决方法为:
- 启用ReturnListener监听无法路由的消息(死信处理)
// 1. 启用ReturnListener监听无法路由的消息 |
- 处理队列满的情况
// 1. 设置合理的队列长度和溢出策略 |
第四步:消费者确认机制(Consumer Acknowledgments)
ACK是什么
消费者成处理消息后,主动通知Brooker删除该消息的机制
核心参数:
- deliveryTag:消息的唯一投递标识(在同一个Channel内递增的long值)
- Multiple:是否批量确认,为true表示批量确认,为false表示单条确认
- requeue:是否重新入队,为true表示重新入队,为false表示丢弃
为什么必须手动ACK
- 避免消息丢失:自动ACK(autoACK=true)在消息刚推送给消费者时就删除,不管消费者是否处理成功
- 消费者弹性:允许消费者处理失败后重试(通过 NACK + requeue)
怎么做?
- ACK (确认成功)
// 成功处理一条消息后 |
作用就是告诉Broker消息已成功处理,可以删除该消息
然后消息会从队列中永久删除
如果不调用的话,消息会一直留在队列中
- NACK (确认失败)
// 处理失败,希望重新投递 |
作用是告诉Broker,消息未成功处理
第三个参数是requeue,表示是否重新入队
如果为true,则消息会重新入队,重新处理
如果为false,则消息会被丢弃或转入死信队列
- Reject (拒绝)
// 等同于 basicNack(deliveryTag, false, true) |
作用同nack,但是只能操作单条信息
完美实现:
// 1. 设置 autoAck=false + 限流 prefetch |
其他注意事项:
消息持久化并不是简单的保存到磁盘
持久化数据流:
- 生产者发送持久化消息
- Broker接收消息
- 记录“存储信息”操作到日志(*.journal)(保证原子性,崩溃恢复)
- (可选)Publish Confirm
- 消息进入内存队列
- 后台:应用日志到主存储
- 消费者消费并ack
- 记录“删除消息”操作到日志(保证ack操作的持久性)
- 后台:应用日志删除操作(从主存储移除消息)
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 花海!
