欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ【DirectExchange交换机模式】【TopicExchange交换机模式】实现

时间:2023-04-22
RabbitMQ【Direct Exchange交换机模式】【TopicExchange交换机模式】实现

RabbitMQ是基于AMQP协议的一种消息队列,常用于异步处理、解耦合的操作场景。

什么叫交换机?

发送者sender向外发送信息的过程中,并不直接投递到mq队列中来,而是先放在交换机上,由交换机进行投递,再把数据发送到队列上

Springboot集成RabbitMQ(Direct交换机模式实现)

添加依赖spring-boot-starter-amqp

org.springframework.boot spring-boot-starter-amqp

配置依赖

#rabbitmqspring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/#u6D88u8D39u8005u6570u91CF消费者数量spring.rabbitmq.listener.simple.concurrency= 10spring.rabbitmq.listener.simple.max-concurrency= 10#u6D88u8D39u8005u6BCFu6B21u4ECEu961Fu5217u83B7u53D6u7684u6D88u606Fu6570u91CFspring.rabbitmq.listener.simple.prefetch= 1#u6D88u8D39u8005u81EAu52A8u542Fu52A8spring.rabbitmq.listener.simple.auto-startup=true#u6D88u8D39u5931u8D25uFF0Cu81EAu52A8u91CDu65B0u5165u961Fspring.rabbitmq.listener.simple.default-requeue-rejected= true#u542Fu7528u53D1u9001u91CDu8BD5重置spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3spring.rabbitmq.template.retry.max-interval=10000spring.rabbitmq.template.retry.multiplier=1.0

创建RabbitMQ的相关配置package

MQConfig.class(RabbitMQ配置类)MQSender.class(RabbitMQ发送者类)MQReceiver.class(RabbitMQ消息接收者类)

MQConfig.class类

import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;//mq的配置类@Configurationpublic class MQConfig {public static final String QUEUE="queue";//注入一个bean实体类@Beanpublic Queue queue(){return new Queue(QUEUE,true);//true代表是否持续}}

4.创建消息发送者MQSender.class

//service注解必须要带上@Servicepublic class MQSender {//log为打印辅助工具,建议采用log打印,而不采用system打印private static Logger log = LoggerFactory.getLogger(MQSender.class);//注入依赖引擎@AutowiredAmqpTemplate amqpTemplate;//发送数据public void send(String message){//RedisService.beanToString转化为String类型的设计log.info("发送的数据"+message);//MQConfig.QUEUE为发送队列amqpTemplate.convertAndSend(MQConfig.QUEUE,message);}}

5.创建消息发送者MQReceiver.class

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;//MQ的接收者import com.imooc.miaosha.controller.GoodsController;//设置服务注解@Servicepublic class MQReceiver {private static Logger log = LoggerFactory.getLogger(MQReceiver.class);//对指定消息队列进行监听@RabbitListener(queues=MQConfig.QUEUE)public void receiver(String message){log.info("接受的信息"+message);}}

6.创建一个Controller,进行模拟请求信息的发送接收处理
编写请求测试

//注入发送者@AutowiredMQSender mqSender; @RequestMapping("/mq") @ResponseBody public String home() { mqSender.send("hello RabbitMQ");//发送消息 return "success"; }

浏览器输入
http://localhost:8080/mq

结果:

rabbitmq.MQReceiver : 发送给的数据hello RabbitMQrabbitmq.MQReceiver : 接受的信息hello RabbitMQ

Springboot集成Topic交换机模式实现

引入依赖(上述依赖相同)实现思路:要发送的消息,首先由一个队列mq1-------> 交换机--------->然后将其消息进行转发到mq2创建两个模拟队列,queue1和queue2

public static final String TOPTIC_QUEUE1="topicqueue1";public static final String TOPIC_QUEUE2="topicqueue2";//Topic模式的消息队列1@Beanpublic Queue topicQueue1(){return new Queue(TOPTIC_QUEUE1,true);}//Topic模式下的消息队列2@Beanpublic Queue topicQueue2(){return new Queue(TOPIC_QUEUE2,true);}

创建Topic交换机

import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;public static final String TOPIC_EXCHAGE="topicExchange";//交换机Topic模式@Beanpublic TopicExchange topicExchange(){return new TopicExchange(TOPIC_EXCHAGE);}

将其消息队列绑定在交换机上

public static final String ROUTING_KEY1="topic.Key1";public static final String ROUTING_KEY2="topic.#";//进行绑定@Beanpublic Binding topicBinding1(){return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);}@Beanpublic Binding topicBinding2(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);}

设置消息发送者MQSender.class

这里设置一个Topic交换机式的方法,目的在于检测消息的匹配度。

//topic交换方法//发送两条信息//第一条是携带着匹配符号topic.key1的信息//第二条是携带者匹配符号topic.#的信息public void sendTopic(String message){log.info("topic模式交换机下的发送的信息:"+message);amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHAGE,"topic.Key1",message+"1");amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHAGE,"topic.Key2",message+"2");}

推理下:
携带着topic.key1的信息只能让mq消息队列queue1来接收,而queue2却不能
原因:

public static final String ROUTING_KEY1="topic.Key1";public static final String ROUTING_KEY2="topic.#";//进行绑定@Beanpublic Binding topicBinding1(){return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);}@Beanpublic Binding topicBinding2(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);}

这两个队列一个绑定携带着只有topic.Key1,只能识别topic.Key1
而第二个是topic.#可以识别topic.XXXX任何信息。
因此得到的结果应该是

mq1和mq2都接受到了topic.Key1匹配幅的信息
而第二条信息(携带着topic.Key2的信息)只能由mqqueue2来接收

检验 设置MQReciver.class

作为接收者
只需要设置两个监听RabbitMQListener进行监听两个消息队列。

@RabbitListener(queues=MQConfig.TOPTIC_QUEUE1)public void receiverTopinc1(String message){log.info("topicQueue1接收的信息:"+message);}@RabbitListener(queues=MQConfig.TOPIC_QUEUE2)public void receiverTopic2(String message){log.info("topicQueue2接收的信息:"+message);}

下面是编写Controller请求进行测试是否是按照预期我们想的那样:

@AutowiredMQSender mqSender; @RequestMapping("/mqTopic") @ResponseBody public String mqTopic() { mqSender.sendTopic("hello RabbitMQ---Topic模式"); return "success";//象征性的输出一个结果 }

结果

浏览器输入:
http://localhost:8080/demo/mqTopic

查看控制台:

rabbitmq.MQReceiver : topic模式交换机下的发送的信息:hello RabbitMQ---Topic模式rabbitmq.MQReceiver : topicQueue2接受的信息:hello RabbitMQ---Topic模式1rabbitmq.MQReceiver : topicQueue2接受的信息:hello RabbitMQ---Topic模式2rabbitmq.MQReceiver : topicQueue1接受的信息:hello RabbitMQ---Topic模式1

分析:
正如我们上面预测的一样:

hello RabbitMQ---Topic模式1这一条信息rabbitmq.MQReceiver : topicQueue2接受的信息:hello RabbitMQ---Topic模式1rabbitmq.MQReceiver : topicQueue1接受的信息:hello RabbitMQ---Topic模式1这两个队列都收到了,因此topic.#和topic.Key1都识别topic.Key1但是hello RabbitMQ---Topic模式2这一条信息只有queue2收到了,说明,topic.#可以识别topic.Key2而topic.Key1不可以识别topic.Key2

到此Direct交换机模式和Topic交换机模式的简单测试实现!
剩下的两个模式:
Fanout Exchange模式
Headers Exchage模式
下一篇再介绍!

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。