✅P260_商城业务-消息队列-可靠投递-发送端确认
大约 3 分钟
商城业务-消息队列-可靠投递-发送端确认
RabbitMQ消息确认机制-可靠抵达
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
publisher confirmCallback 确认模式
publisher returnCallback 未投递到 queue 退回模式
consumer ack机制
可靠抵达-ConfirmCallback
spring.rabbitmq.publisher-confirms=true
- 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback
- CorrelationData:用来表示当前消息唯一性。
- 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用confirmCallback。
- 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback
可靠抵达-ReturnCallback
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
- confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式
- 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
开启ComfirmCallBack回调函数
application.properties
spring.rabbitmq.publisher-confirms=true
编写回调函数:
cfmall-order/src/main/java/com/gyz/cfmall/order/config/MyRabbitConfig.java
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 当MyRabbitConfig对象创建完成后,执行此方法
*/
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 消息的唯一id
* @param b broker是否接收到消息
* @param s 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("correlationData:" + correlationData + "===>broker是否接收到消息:" + b + "===>失败的原因:" + s);
}
});
}
发送消息时设置消息的唯一id
cfmall-order/src/main/java/com/gyz/cfmall/order/controller/RabbitController.java
@GetMapping("/sendUniqueMessageId")
public R sendUniqueMessageId(@RequestParam(value = "num", defaultValue = "10") Integer num) {
for (int i = 0; i < num; i++) {
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("哈哈" + i);
orderReturnReasonEntity.setSort(1);
orderReturnReasonEntity.setStatus(0);
orderReturnReasonEntity.setCreateTime(new Date());
rabbitTemplate.convertAndSend(
"new-direct-change",
"queuesChange",
orderReturnReasonEntity,
new CorrelationData(UUID.randomUUID().toString()));
}
return R.ok();
}
查看broker接收消息之后ConfirmCallback回调函数的执行结果:
开启ReturnCallBack回调函数
application.properties
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步发送优先回调我们这个ReturnCallback
spring.rabbitmq.template.mandatory=true
编写回调函数
cfmall-order/src/main/java/com/gyz/cfmall/order/config/MyRabbitConfig.java#setReturnCallback
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 消息的唯一id
* @param b broker是否接收到消息
* @param s 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("correlationData:" + correlationData + "===>broker是否接收到消息:" + b + "===>失败的原因:" + s);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 投递失败的消息详细信息
* @param i 回复的状态码
* @param s 回复的文本内容
* @param s1 当时这个消息发给哪个交换机
* @param s2 当时这个消息用哪个路邮键
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("Message:" + message + "\n===>replyCode:" + i + "\n===>replyText:" + s + "\n===>exchangeName:" + s1 + "\n===>routekey:" + s2);
}
});
}
测试
注:模拟Exchange未能将消息投递到目标 queue,最简单的办法就是修改为不存在的路由键
发送消息代码:
@GetMapping("/sendUniqueMessageId")
public R sendUniqueMessageId(@RequestParam(value = "num", defaultValue = "10") Integer num) {
for (int i = 0; i < num; i++) {
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("哈哈" + i);
orderReturnReasonEntity.setSort(1);
orderReturnReasonEntity.setStatus(0);
orderReturnReasonEntity.setCreateTime(new Date());
rabbitTemplate.convertAndSend(
"new-direct-change",
"queuesChanges", //routekey:queuesChanges是不存在的
orderReturnReasonEntity,
new CorrelationData(UUID.randomUUID().toString()));
}
return R.ok();
}
打印结果:
Message:(Body:'{"id":1,"name":"哈哈0","sort":1,"status":0,"createTime":1695194384329}' MessageProperties [headers={spring_returned_message_correlation=cbfe34e4-ff28-4869-a1bf-68ca92ad1d22, X-B3-SpanId=d2dff1277bf37fdf, X-B3-ParentSpanId=77666c875712e375, X-B3-Sampled=0, X-B3-TraceId=77666c875712e375, __TypeId__=com.gyz.cfmall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
===>replyCode:312
===>replyText:NO_ROUTE
===>exchangeName:new-direct-change
===>routekey:queuesChanges