MQ消息可靠性

是什么

首先我们要知道。MQ是消息中间件,MQ的消息的发送是异步的。其主要步骤是:

生产者 -> 交换机 -> 队列 -> 消费者

其中有3个传输数据的地方,任意一个步骤出错都有可能导致数据丢失。那么我们怎么保证消息的可靠性呢?

为什么

其实消息可靠性是根据业务场景来进行要求的

  • 金融支付:丢消息就是丢钱
  • 订单系统:丢消息就是损失收入
  • 日志采集:可允许少量丢失,保证速度快
  • 实时监控:允许部分丢失,保证延迟低

怎么做

第一步:生产者确认机制(Publisher Confirms)

首先我们保证消息成功到达Broker(RabbitMQ服务器)

// 1. 开启生产者确认
channel.confirmSelect();

// 2. 发送消息
channel.basicPublish("exchange", "routingKey",
new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化消息
.build(),
message.getBytes());

// 3. 等待Broker确认
if (channel.waitForConfirms(5000)) {
System.out.println("消息已确认到达Broker");
} else {
System.out.println("消息发送失败,需要重试");
}

补充:

  • deliveryMode(),1表示非持久化消息,2表示持久化消息
  • 在这里我们使用的是同步确认
  • channel.basicPublish(“my_exchange”, “my_routing_key”, properties, messageBodyBytes);
    关于这四个参数:
    1. my_exchange:交换机名称
    2. my_routing_key:路由键
    3. properties:消息属性,它主要是包含消息元数据的对象,定义了消息本身的一些属性和行为,常见属性如下:
      1. Priority:优先级,数字越小优先级越高
      2. delivery_mode:消息持久化,1 内存,2 内存+磁盘
      3. expiration: 延迟消息:毫秒为单位
    4. messageBodyBytes:消息内容,以字节数组(byte[])的形式传递,生产者负责序列化,消费者负责反序列化

注:
properties:使用 AMQP.BasicProperties.Builder()来构建这个对象

第二步:消息持久化

消息持久化的目的是为了保证Broker重启后,消息不丢失

持久化有三点:

  • 交换机持久化(durable=true)
  • 队列持久化(durable=true)
  • 消息持久化(delivery_mode=2)

第三步:交换机-队列环节丢失风险

其实交换机到队列也会存在消息丢失的风险,具体场景如下:

  1. 路由键的错误绑定,消息被交换机直接丢弃
  2. 队列达到最大容量,新消息被丢弃或顶掉旧消息(取决于配置)
  3. Broker(RabbitMQ)出现问题,消息丢失

对应解决方法为:

  1. 启用ReturnListener监听无法路由的消息(死信处理)
// 1. 启用ReturnListener监听无法路由的消息
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
System.out.println("消息无法路由!交换器: " + exchange + ", 路由键: " + routingKey);
// 这里可以:记录日志、重试、转入死信队列等
});
  1. 处理队列满的情况
// 1. 设置合理的队列长度和溢出策略
Map<String, Object> args = new HashMap<>();
args.put("x-max-length", 1000); // 最大消息数
args.put("x-overflow", "reject-publish"); // 队列满时拒绝新消息(触发ReturnListener)
// 或者 "drop-head":丢弃最老的消息(默认)

// 2. 监控队列长度,动态调整
AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive("my_queue");
int messageCount = declareOk.getMessageCount();
if (messageCount > 800) {
// 触发告警或限流
}

第四步:消费者确认机制(Consumer Acknowledgments)

ACK是什么

消费者成处理消息后,主动通知Brooker删除该消息的机制

核心参数:

  • deliveryTag:消息的唯一投递标识(在同一个Channel内递增的long值)
  • Multiple:是否批量确认,为true表示批量确认,为false表示单条确认
  • requeue:是否重新入队,为true表示重新入队,为false表示丢弃

为什么必须手动ACK

  • 避免消息丢失:自动ACK(autoACK=true)在消息刚推送给消费者时就删除,不管消费者是否处理成功
  • 消费者弹性:允许消费者处理失败后重试(通过 NACK + requeue)

怎么做?

  1. ACK (确认成功)
// 成功处理一条消息后
channel.basicAck(deliveryTag, false); // false: 仅确认当前消息

作用就是告诉Broker消息已成功处理,可以删除该消息
然后消息会从队列中永久删除
如果不调用的话,消息会一直留在队列中

  1. NACK (确认失败)
// 处理失败,希望重新投递
channel.basicNack(deliveryTag, false, true); // false: 非批量, true: 重新入队(requeue)

// 处理失败,直接丢弃(或转死信)
channel.basicNack(deliveryTag, false, false); // false: 不重新入队

作用是告诉Broker,消息未成功处理
第三个参数是requeue,表示是否重新入队
如果为true,则消息会重新入队,重新处理
如果为false,则消息会被丢弃或转入死信队列

  1. Reject (拒绝)
// 等同于 basicNack(deliveryTag, false, true)
channel.basicReject(deliveryTag, true); // requeue=true

// 等同于 basicNack(deliveryTag, false, false)
channel.basicReject(deliveryTag, false); // requeue=false

作用同nack,但是只能操作单条信息

完美实现:

// 1. 设置 autoAck=false + 限流 prefetch
channel.basicQos(10); // 最多10条未确认消息
channel.basicConsume("order_queue", false, (consumerTag, delivery) -> {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
// 2. 处理消息(核心业务逻辑)
processOrder(delivery.getBody());

// 3. 处理成功 -> ACK
channel.basicAck(deliveryTag, false);
} catch (TemporaryException e) {
// 4. 临时错误(可重试):延迟重试
if (getRetryCount(delivery) < MAX_RETRIES) {
sendToDelayQueue(delivery); // 发送到延迟队列
channel.basicAck(deliveryTag, false); // 确认原消息
} else {
// 4.1 超重试次数 -> 转死信
channel.basicReject(deliveryTag, false); // requeue=false
}
} catch (FatalException e) {
// 5. 致命错误(不可重试):直接转死信
channel.basicReject(deliveryTag, false); // requeue=false
} catch (Exception e) {
// 6. 未知错误:保守策略(转死信)
channel.basicReject(deliveryTag, false);
}
});

其他注意事项:

消息持久化并不是简单的保存到磁盘

持久化数据流:

  1. 生产者发送持久化消息
  2. Broker接收消息
  3. 记录“存储信息”操作到日志(*.journal)(保证原子性,崩溃恢复)
  4. (可选)Publish Confirm
  5. 消息进入内存队列
  6. 后台:应用日志到主存储
  7. 消费者消费并ack
  8. 记录“删除消息”操作到日志(保证ack操作的持久性)
  9. 后台:应用日志删除操作(从主存储移除消息)