欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

03RabbitMq-SpringBoot整合RabbitMq五种模式

时间:2023-07-15
一、Fanout模式 生产者

1.导入amqp协议依赖

org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web

2.配置相关

新建application.yml配置IP 用户名密码等

server: port: 8080spring: #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest #虚拟host 可以不设置,使用server默认host virtual-host: /

3.新建订单派发的服务

@Servicepublic class OrderService { @Autowired private RabbitTemplate rabbitTemplate; private String exchangeName = "fanout_order_exchange"; private String routeKey = ""; public void makeOrder(Long userId, Long productId, int num) { // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); }}

3.绑定交换机与队列关系

通过新建Configuration类来配置

@Configurationpublic class FanoutRabbitConfig { //步骤1.声明交换机 @Bean public FanoutExchange fanoutOrderExchange() { return new FanoutExchange ("fanout_order_exchange", true, false); } //步骤2.声明队列 @Bean public Queue emailQueue() { return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { return new Queue("weixin.fanout.queue", true); } //步骤3.完成绑定关系 @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()); }}

4.编写发送消息的测试类

@SpringBootTestclass SpringBootOrderRabbitmqProducerApplicationTests { @Autowired OrderService orderService; @Test void contextLoads() throws InterruptedException { orderService.makeOrder(1, 1, 12); }}

通过观察mqweb界面发现绑定关系建立,且队列中已经有发送的消息了。

消费者

在同一个项目中新建消费者模块,并复制配置.yml文件到消费者模块(请记得修改端口,否则会出现端口冲突)。

1.创建三个消费者来接收消息

@RabbitListener(queues = "weixin.fanout.queue")//通过注解监听接收队列消息@Servicepublic class EmailController { @RabbitHandler//接收消息交由此注解标注的方法处理 public void messagerevice(String msg){ System.out.println("微信发送消息:"+msg); }}

@RabbitListener(queues = "sms.fanout.queue")//通过注解监听接收队列消息@Servicepublic class SmsController { @RabbitHandler//接收消息交由此注解标注的方法处理 public void messagerevice(String msg){ System.out.println("sms发送消息:"+msg); }}

@RabbitListener(queues = "email.fanout.queue")//通过注解监听接收队列消息@Servicepublic class EmailController { @RabbitHandler//接收消息交由此注解标注的方法处理 public void messagerevice(String msg){ System.out.println("Email发送消息:"+msg); }}

二、Direct模式

新建订单派发服务添加指定routingkey:

@Servicepublic class OrderService { @Autowired private RabbitTemplate rabbitTemplate; private String exchangeName = "direct_order_exchange"; private String routeKey1 = "email"; private String routeKey2 = "sms"; public void makeOrder(Long userId, Long productId, int num) { // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey1, orderNumer); rabbitTemplate.convertAndSend(exchangeName, routeKey2, orderNumer); }}

SpringBoot整合Direct模式与上述Fanout模式类似,两个不同

声明交换机时注入directExchange在绑定交换机与队列关系中通过with()方法绑定routingkey:

@Configurationpublic class DirectRabbitConfig { //步骤1.声明交换机 @Bean public DirectExchange directOrderExchange() { return new DirectExchange("direct_order_exchange", true, false); } //步骤2.声明队列 @Bean public Queue emailQueue() { return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { return new Queue("weixin.fanout.queue", true); } //步骤3.完成绑定关系(包括绑定routingkey) @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with("weixin"); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("sms"); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("email"); }}

编写消费者测试类同上

另外:若未启动生产者,直接启动消费者就会报错:队列以及绑定关系不存在。
解决办法:消费者直接与队列产生直接关系。将声明队列以及建立绑定关系的方法定义消费者模块当中甚至两个模块同时定义,这样在启动消费者时就会产生绑定声明关系。

三、Topic模式 方法1:通过Configuration配置类:

与上述direct模式类似,修改的地方差不多:
1.声明队列用TopicExchange
2.利用with()方法绑定模糊routingkey

方法2:通过注解声明队列绑定关系

在消费者模块通过@RabbitMqListener来声明队列,绑定模糊routingkey

@RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "email.topic.queue",autoDelete = "false",durable = "true"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "topic_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.TOPIC),key = "*.email.#"))@Servicepublic class EmailControllerTopic { @RabbitHandler public void emailTopicRevice(String msg){ System.out.println("topic->>>>>>>>>邮件发送消息:"+msg); }}

@RabbitListener(bindings =@QueueBinding( value = @Queue(value = "sms.topic.queue",autoDelete = "false",durable = "true"), exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),key = "#.sms.#"))@Servicepublic class SMSControllerTopic { @RabbitHandler public void smsTopicRevice(String msg){ System.out.println("topic->>>>>>>>>sms发送消息:"+msg); }}

@RabbitListener(bindings =@QueueBinding( value = @Queue(value = "weixin.topic.queue",autoDelete = "false",durable = "true"), exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),key = "com.#"))@Servicepublic class WechatControllerTopic { @RabbitHandler public void emailTopicRevice(String msg){ System.out.println("topic ->>>>>>>>>微信发送消息:"+msg); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。