MQ消息丢失的情况:
生产者确认机制
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未成功投递到交换机,返回nack
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因。

PS:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。
1.在生产者服务中的application.yml添加配置:
1 2 3 4 5 6
| spring: rabbitmq: publisher-returns: true publisher-confirm-type: correlated template: mandatory: true
|
配置说明:
publisher-confirm-type:开启publisher-confirm,支持的两种类型:
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义ConfirmCallback,MQ返回结果时回调ConfirmCallback
publisher-returns:开启publisher-return功能,同样基于callback机制,定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false,则直接丢弃消息
2.配置ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware {
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息发生失败,应答码={},原因={},交换机={},路由键={},消息={}", replyCode, replyText, exchange, routingKey, message.toString()); }); } }
|
3.设置ConfirmCallback,指定的消息ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void confirmTest() { String exchangeName = "ezio.topic"; String message = "今天天气不错"; CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback( result -> { if (result.isAck()) { log.debug("消息成功投递到交换机!消息ID:{}", correlationData.getId()); } else { log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId()); } }, ex -> log.error("消息发送失败!",ex) ); rabbitTemplate.convertAndSend(exchangeName, "china.weather", message, correlationData); } }
|
消息持久化
MQ默认时内存存储消息,开启持久化功能可以确保消息缓存在mq中的消息不丢失。
1.交换机持久化:默认true开启
2.队列持久化:在定义队列时设置
1 2 3 4 5
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1", durable = "true"), //绑定的队列 exchange = @Exchange(name = "test.direct"), //声明交换机名称 key = {"red", "blue"} //路由key,可配置通配符,red.# or red.* ))
|
3.消息持久化,SpringAMQP中消息默认持久,可以通过MessageProperties中的DeliveryMode指定:
1 2 3
| Message message = MessageBuilder.withBody("今天天气不错".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build();
|
消费者确认
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到回执后才会删除该消息,SpringAMQP提供三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用API发送ack。
- auto:自动ack,有spring检测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,消息投递后立即被删除。
1 2 3 4 5 6 7 8 9 10 11
| spring: rabbitmq: host: 127.0.0.1 port: 5672 username: ezio password: 123456 virtual-host: / listener: simple: prefetch: 1 acknowledge-mode: auto
|
消费者失败重试
消费者出现异常后,消息会不断requeue(重新入队)到队列,在重新发送给消费者,然后再次异常,再次requeue,导致mq的消息处理飙升,带来不必要的压力。
可以利用spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| spring: rabbitmq: host: 127.0.0.1 port: 5672 username: ezio password: 123456 virtual-host: / listener: simple: prefetch: 1 acknowledge-mode: auto retry: enabled: true initial-interval: 1000 multiplier: 1 max-attempts: 3 stateless: true
|
消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果依然失败,则需要有MessageRecoverer接口来处理,它包含三个不同实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认实现方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

实现方式:定义接受失败消息的交换机,队列及其绑定关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@Configuration public class ErrorMessageConfig {
@Bean public DirectExchange errorExchange() { return new DirectExchange("error.direct", true, false); }
@Bean public Queue errorQueue() { return new Queue("error.queue", true); }
@Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) { return BindingBuilder .bind(errorQueue) .to(errorExchange).with("error"); }
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
}
|