✅P260_商城业务-消息队列-可靠投递-发送端确认

gong_yz大约 3 分钟谷粒商城

商城业务-消息队列-可靠投递-发送端确认


RabbitMQ消息确认机制-可靠抵达

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制

publisher confirmCallback 确认模式

publisher returnCallback 未投递到 queue 退回模式

consumer ack机制

可靠抵达-ConfirmCallback

spring.rabbitmq.publisher-confirms=true

  • 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback
  • CorrelationData:用来表示当前消息唯一性。
  • 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用confirmCallback。
  • 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

可靠抵达-ReturnCallback

spring.rabbitmq.publisher-returns=true

spring.rabbitmq.template.mandatory=true

  • confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式
  • 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

开启ComfirmCallBack回调函数

application.properties

spring.rabbitmq.publisher-confirms=true

编写回调函数:

cfmall-order/src/main/java/com/gyz/cfmall/order/config/MyRabbitConfig.java

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 当MyRabbitConfig对象创建完成后,执行此方法
     */
    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData  消息的唯一id
             * @param b                broker是否接收到消息
             * @param s                失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("correlationData:" + correlationData + "===>broker是否接收到消息:" + b + "===>失败的原因:" + s);
            }
        });
    }

发送消息时设置消息的唯一id

cfmall-order/src/main/java/com/gyz/cfmall/order/controller/RabbitController.java

    @GetMapping("/sendUniqueMessageId")
    public R sendUniqueMessageId(@RequestParam(value = "num", defaultValue = "10") Integer num) {
        for (int i = 0; i < num; i++) {
            OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setName("哈哈" + i);
            orderReturnReasonEntity.setSort(1);
            orderReturnReasonEntity.setStatus(0);
            orderReturnReasonEntity.setCreateTime(new Date());

            rabbitTemplate.convertAndSend(
                    "new-direct-change",
                    "queuesChange",
                    orderReturnReasonEntity,
                    new CorrelationData(UUID.randomUUID().toString()));
        }
        return R.ok();
    }

查看broker接收消息之后ConfirmCallback回调函数的执行结果:


开启ReturnCallBack回调函数

application.properties

# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步发送优先回调我们这个ReturnCallback
spring.rabbitmq.template.mandatory=true

编写回调函数

cfmall-order/src/main/java/com/gyz/cfmall/order/config/MyRabbitConfig.java#setReturnCallback

	@Autowired
    RabbitTemplate rabbitTemplate;

	@PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData  消息的唯一id
             * @param b                broker是否接收到消息
             * @param s                失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("correlationData:" + correlationData + "===>broker是否接收到消息:" + b + "===>失败的原因:" + s);
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message 投递失败的消息详细信息
             * @param i       回复的状态码
             * @param s       回复的文本内容
             * @param s1      当时这个消息发给哪个交换机
             * @param s2      当时这个消息用哪个路邮键
             */
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("Message:" + message + "\n===>replyCode:" + i + "\n===>replyText:" + s + "\n===>exchangeName:" + s1 + "\n===>routekey:" + s2);
            }
        });
    }

测试

注:模拟Exchange未能将消息投递到目标 queue,最简单的办法就是修改为不存在的路由键

发送消息代码:

@GetMapping("/sendUniqueMessageId")
public R sendUniqueMessageId(@RequestParam(value = "num", defaultValue = "10") Integer num) {
    for (int i = 0; i < num; i++) {
        OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
        orderReturnReasonEntity.setId(1L);
        orderReturnReasonEntity.setName("哈哈" + i);
        orderReturnReasonEntity.setSort(1);
        orderReturnReasonEntity.setStatus(0);
        orderReturnReasonEntity.setCreateTime(new Date());

        rabbitTemplate.convertAndSend(
                "new-direct-change",
                "queuesChanges",  //routekey:queuesChanges是不存在的
                orderReturnReasonEntity,
                new CorrelationData(UUID.randomUUID().toString()));
    }
    return R.ok();
}

打印结果:

Message:(Body:'{"id":1,"name":"哈哈0","sort":1,"status":0,"createTime":1695194384329}' MessageProperties [headers={spring_returned_message_correlation=cbfe34e4-ff28-4869-a1bf-68ca92ad1d22, X-B3-SpanId=d2dff1277bf37fdf, X-B3-ParentSpanId=77666c875712e375, X-B3-Sampled=0, X-B3-TraceId=77666c875712e375, __TypeId__=com.gyz.cfmall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
===>replyCode:312
===>replyText:NO_ROUTE
===>exchangeName:new-direct-change
===>routekey:queuesChanges