✅P259_商城业务-消息队列-RabbitListener-RabbitHandler接收消息

gong_yz大约 4 分钟谷粒商城

商城业务-消息队列-RabbitListener&RabbitHandler接收消息


@RabbitListener

@RabbitListener使用前提:必须有@EnableRabbit并且标注方法的类必须在组件中(启动类加上@EnableRabbit注解)

@RabbitListener标注类上监听多个队列

@RabbitHandler标注在方法上用于接受不同类型的消息对象

示例

cfmall-order/src/main/java/com/gyz/cfmall/order/service/impl/OrderServiceImpl.java

@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {
	@RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Object message) {
        System.out.println("接收到消息...内容:" + message + "===>类型:" + message.getClass());
    }
}

启动订单服务,打印如下:

接收到消息...内容:(Body:'{"id":1,"name":"hahaha","sort":1,"status":0,"createTime":1694764377331}' MessageProperties [headers={__TypeId__=com.gyz.cfmall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=new-direct-change, receivedRoutingKey=queuesChange, deliveryTag=1, consumerTag=amq.ctag-7wgm24_4RakC5ay-BPvFtA, consumerQueue=new-queus])===>类型:class org.springframework.amqp.core.Message

message组成:①消息体+消息头 ②class org.springframework.amqp.core.Message类型的对象

方法的参数类型

1.class org.springframework.amqp.core.Message类型的对象

    @RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message msg) {
        //获取消息体
        byte[] body = msg.getBody();
        //获取消息头
        MessageProperties messageProperties = msg.getMessageProperties();
        System.out.println("接收到消息...内容:" + msg + "===>类型:" + msg.getClass());
    }

2.T<发送消息的类型> :假如发送消息的类型为 OrderReturnReasonEntity 则接受的消息类型也可以为 OrderReturnReasonEntity

    @RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message message, OrderReturnReasonEntity entity) {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
    }

输出:

接收到消息...内容:OrderReturnReasonEntity(id=1, name=hahaha, sort=1, status=0, createTime=Fri Sep 15 15:45:14 GMT+08:00 2023)

3.Channel channel:当前传输数据的通道

    @RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message message, OrderReturnReasonEntity entity, Channel channel) {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
    }

模拟多个客户端接收消息

多个客户端监听Queue。只要收到消息,队列就删除消息,而且只能有一个客户端收到此消息的场景

1.订单服务启动多个,同一个消息,只能有一个客户端收到

新创建一个订单服务,操作如下:

新建完成启动订单服务CfmallOrderApplication--8901

接收消息代码如下

com.gyz.cfmall.order.service.impl.OrderServiceImpl#receiveMessage

    @RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message message, OrderReturnReasonEntity entity, Channel channel) {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
        System.out.println("消息处理完成" + entity.getName());
    }

发送消息代码如下

com.gyz.cfmall.order.controller.RabbitController#sendMessage

package com.gyz.cfmall.order.controller;

import com.gyz.cfmall.order.entity.OrderReturnReasonEntity;
import com.gyz.common.utils.R;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

/**
 * @author gong_yz
 * @Description
 * @Date 2023/9/15
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage")
    public R sendMessage(@RequestParam(value = "num", defaultValue = "10") Integer num) {
        for (int i = 0; i < num; i++) {
            OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setName("哈哈" + i);
            orderReturnReasonEntity.setSort(1);
            orderReturnReasonEntity.setStatus(0);
            orderReturnReasonEntity.setCreateTime(new Date());

            rabbitTemplate.convertAndSend("new-direct-change", "queuesChange", orderReturnReasonEntity);
            System.out.println("消息发送成功");
        }
        return R.ok();
    }
}

测试结果:同一个消息,只能有一个客户端收到

2. 只有当一个消息完全处理完,方法运行结束,客户端才可以接收下一个消息

接收消息代码:

com.gyz.cfmall.order.service.impl.OrderServiceImpl#receiveMessage

	@RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message message, OrderReturnReasonEntity entity) throws InterruptedException {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
        Thread.sleep(3000);
        System.out.println("消息处理完成==>" + entity.getName());
    }

测试结果:


@RabbitHandler

@RabbitListener标注类上监听多个队列

@RabbitHandler标注在方法上用于接受不同类型的消息对象

模拟向队列发送不同消息对象

发送消息代码:cfmall-order/src/main/java/com/gyz/cfmall/order/controller/RabbitController.java

package com.gyz.cfmall.order.controller;

import com.gyz.cfmall.order.entity.OrderEntity;
import com.gyz.cfmall.order.entity.OrderReturnReasonEntity;
import com.gyz.common.utils.R;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.UUID;

/**
 * @author gong_yz
 * @Description
 * @Date 2023/9/15
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage")
    public R sendMessage(@RequestParam(value = "num", defaultValue = "10") Integer num) {
        for (int i = 0; i < num; i++) {
            if (i % 2 == 0) {
                OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
                orderReturnReasonEntity.setId(1L);
                orderReturnReasonEntity.setName("哈哈" + i);
                orderReturnReasonEntity.setSort(1);
                orderReturnReasonEntity.setStatus(0);
                orderReturnReasonEntity.setCreateTime(new Date());

                rabbitTemplate.convertAndSend("new-direct-change", "queuesChange", orderReturnReasonEntity);
                System.out.println("OrderReturnReasonEntity消息发送成功");
            } else {
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("new-direct-change", "queuesChange", orderEntity);
                System.out.println("OrderEntity消息发送成功");
            }
        }
        return R.ok();
    }
}

接收消息代码:

	/**
	* @RabbitHandler标注在方法上,重载区分不同的消息
	*/
	@RabbitHandler
    public void receiveMessage(Message message, OrderReturnReasonEntity entity) {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
    }

    @RabbitHandler
    public void receiveMessage(OrderEntity orderEntity) {
        System.out.println("接收到消息...内容:" + orderEntity.getOrderSn());
    }

测试结果如下