✅P261_商城业务-消息队列-可靠投递-消费端确认
大约 3 分钟
商城业务-消息队列-可靠投递-消费端确认
可靠抵达-Ack消息确认机制
概念
消费者获取到消息,成功处理,可以回复Ack给Broker
basic.ack
用于肯定确认;broker将移除此消息basic.nack
用于否定确认;可以指定broker是否丢弃此消息,可以批量basic.reject
用于否定确认;同上,但不能批量
默认自动ack,消息被消费者收到,就会从broker的queue中移除
queue无消费者,消息依然会被存储,直到消费者消费
消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成, 或者成功处理。我们可以开启手动ack模式
- 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
- 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
- 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户 端断开,消息不会被broker移除,会投递给别人
说明:
消费者消费消息默认采用的是自动ACK也就是自动签收,broker通过通道将消息都传递给消费者之后自动将消息移除队列,这个就是自动ACK。采用自动ACK将会出现一些问题:当消费者接收到许多条消息时,依次处理这些消息但是在此期间宕机了将会导致后续未处理的消息丢失。
解决方案:手动ACK
实现
手动ACK设置
application.properties
# 手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
手动ACK回复
cfmall-order/src/main/java/com/gyz/cfmall/order/service/impl/OrderServiceImpl.java
@RabbitListener(queues = {"new-queus"})
public void receiveMessage(Message message, OrderReturnReasonEntity entity, Channel channel) {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接收到消息...内容:" + entity);
//该消息的index
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
if (deliveryTag % 2 == 0) {
channel.basicAck(deliveryTag, false);
System.out.println("签收了货物..." + deliveryTag);
} else {
channel.basicNack(deliveryTag, false, true);
System.out.println("没有签收货物..." + deliveryTag);
}
} catch (IOException e) {
//网络中断
}
}
channel.basicAck(deliveryTag, false);
deliveryTag
:唯一标识 ID。当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channelmultiple
:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicNack(deliveryTag, false, true);
deliveryTag
:唯一标识 ID,上面已经解释了multiple
:上面已经解释了requeue
- true :重回队列
- false :丢弃,我们在nack方法中必须设置 false,否则重发没有意义
channel.basicReject
方法需要传递两个参数
deliveryTag
:唯一标识 ID,上面已经解释了。requeue
:上面已经解释了,在reject方法里必须设置true。