欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

rabbitMQ交换机的讲解

时间:2023-04-27

交换机

Exchange

在rabbitmq中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取的方式给消费者进行消费

生产者将消息发送到Exchange,由Exchange再路由到一个或者多个队列中 

路由键(Routingkey)

生产者将消息发送给交换机的时候,会指定Routingkey指定路由规则

绑定键(Bindingkey)

通过绑定键将交换机与队列关联起来,这样rabbitMQ就知道如何正确地将消息路由到队列。

关系小结

生产者将消息发送给哪个Exchange是需要Routingkey决定的,生产者需要将Exchange决定的,生产者需要将Exchange与哪个队列绑定时需要由Bindingkey决定的

 

 

 

 

一、直连(Direct)

provider

创建俩个类

ProviderController 

package com.smy.provider;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class ProviderController { @Autowired private RabbitTemplate template; @RequestMapping("/directSend") public String directSend(String routingkey){ template.convertAndSend("directExchange",routingkey,"Hello World"); return "yes"; }}

创建一个mq包 

DirectConfig  

package com.smy.provider.mq;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DirectConfig { @Bean public Queue directQueueA(){ return new Queue("directQueueA",true); } @Bean public Queue directQueueB(){ return new Queue("directQueueB",true); } @Bean public Queue directQueueC(){ return new Queue("directQueueC",true); } @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } @Bean public Binding bindingA(){ return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA"); } @Bean public Binding bindingB(){ return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB"); } @Bean public Binding bindingC(){ return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC"); }}

运行provider 

 comsuner

新建一个mq包

在里面新建三个类接收队列

DirectReceiverA 

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "directQueueA")@Slf4jpublic class DirectReceiverA { @RabbitHandler public void process(String message){ log.info("A接到"+message); }}

DirectReceiverB 

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "directQueueB")@Slf4jpublic class DirectReceiverB { @RabbitHandler public void process(String message){ log.info("B接到"+message); }}

 DirectReceiverC 

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "directQueueC")@Slf4jpublic class DirectReceiverC { @RabbitHandler public void process(String message){ log.info("C接到"+message); }}

重新运行生产者provider和消费者comsuner

 二、主题交换机(tipoc)

provider

新建TopicConfig 

package com.smy.provider.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class TopicConfig { public final static String KEY_A="*.orange.*"; public final static String KEY_B="*.*.rabbit"; public final static String KEY_C="lazy.#"; @Bean public Queue topicQueueA(){ return new Queue("topicQueueA",true); } @Bean public Queue topicQueueB(){ return new Queue("topicQueueB",true); } @Bean public Queue topicQueueC(){ return new Queue("topicQueueC",true); } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding TopicbindingA(){ return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A); } @Bean public Binding TopicbindingB(){ return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B); } @Bean public Binding TopicbindingC(){ return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C); }}

在ProviderController类增加这个方法

@RequestMapping("/topicSend") public String topicSend(String routingkey){ template.convertAndSend("topicExchange",routingkey,"Hello World"); return "yes"; }

在comsuner新建三个类

topicReceiverA 

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "topicQueueA")@Slf4jpublic class topicReceiverA { @RabbitHandler public void process(String message){ log.warn("A接到"+message); }}

topicReceiverB  

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "topicQueueB")@Slf4jpublic class topicReceiverB { @RabbitHandler public void process(String message){ log.warn("B接到"+message); }}

topicReceiverC  

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "topicQueueC")@Slf4jpublic class topicReceiverC { @RabbitHandler public void process(String message){ log.warn("C接到"+message); }}

 

 三、扇形交换机(fanout)

provider

FanoutConfig 都不需要键

package com.smy.provider.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FanoutConfig { @Bean public Queue fanoutQueueA(){ return new Queue("fanoutQueueA",true); } @Bean public Queue fanoutQueueB(){ return new Queue("fanoutQueueB",true); } @Bean public Queue fanoutQueueC(){ return new Queue("fanoutQueueC",true); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } @Bean public Binding fanoutbindingA(){ return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange()); } @Bean public Binding fanoutbindingB(){ return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange()); } @Bean public Binding fanoutbindingC(){ return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange()); }}

键留着但是写空值,因为不写键 交换机就变成了键

@RequestMapping("/fanoutSend") public String fanoutSend(String routingkey){ template.convertAndSend("fanoutExchange",null,"Hello World"); return "yes"; }

comsuner

FanoutReceiverA 

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "fanoutQueueA")@Slf4jpublic class FanoutReceiverA { @RabbitHandler public void process(String message){ log.error("A接到"+message); }}

 FanoutReceiverB 

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "fanoutQueueB")@Slf4jpublic class FanoutReceiverB { @RabbitHandler public void process(String message){ log.error("B接到"+message); }}

FanoutReceiverC 

package com.smy.comsuner.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "fanoutQueueC")@Slf4jpublic class FanoutReceiverC { @RabbitHandler public void process(String message){ log.error("C接到"+message); }}

 

 

 

 

因为BB绑定了A又绑定了B 

 

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。