欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ02.交换机的讲解

时间:2023-04-22
前言 交换机 (Exchange)

生产者将消息发送到Exchange,有Exchange再路由到一个或多个队列中

路由键 (RoutingKey)

生产者将信息发送给交换机时会指定RoutingKey指定路由规则

绑定键 (BindingKey)

通过绑定键将交换机和队列关联起来,这样RabbitMQ就知道如何正确的将消息路由到队列

关系小结

生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定

交换机类型 直连交换机:Direct exchange 主题交换机:Topic exchange   扇形交换机:Fanout exchange   首部交换机:Headers exchange  默认交换机:  Dead Letter Exchange (死信交换机)

 

交换机的属性

 

 一.直连交换机 1.新建DirectConfig类

创建队列

创建交换机

行交换机和队列的绑定:设置BindingKey

package com.lj.provider.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class DirectConfig { @Bean public Queue direcrQueueA(){ return new Queue("direcrQueueA",true); } @Bean public Queue direcrQueueB(){ return new Queue("direcrQueueB",true); } @Bean public Queue direcrQueueC(){ return new Queue("direcrQueueC",true); } @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } public Binding BindingA(){ return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("aa"); } public Binding BindingB(){ return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("bb"); } public Binding BindingC(){ return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("cc"); }}

2.建一个ProviderController发送信息

package com.lj.provider;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@SuppressWarnings("all")public class ProviderController { @Autowired private RabbitTemplate template; @RequestMapping("/directSend") public String DirectSend(String routingKey){ template.convertAndSend("directExchange",routingKey,"Hello World"); return "yes"; }}

 这里面是没有Queue的

运行成功

 3.建立3个DirectReceiver类接收消息

package com.lj.consumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@SuppressWarnings("all")@RabbitListener(queues = "direcrQueueA")@Slf4jpublic class DirectReceiverA { @RabbitHandler public void process(String msg){ log.warn("A接到"+msg); }}

 

二.主题交换机  1.在生产者新建 TopicConfig

package com.lj.provider.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class TopicConfig { public final static String KEY_A="*.orange.*"; public final static String KEY_B="*.*.rabbit"; public final static String KEY_C="lazy.#"; @Bean public Queue topicQueueA(){ return new Queue("topicQueueA",true); } @Bean public Queue topicQueueB(){ return new Queue("topicQueueB",true); } @Bean public Queue topicQueueC(){ return new Queue("topicQueueC",true); } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding topicBindingA(){ return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A); } @Bean public Binding topicBindingB(){ return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B); } @Bean public Binding topicBindingC(){ return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C); }}

2.在 Controller 里面加一个方法

@RequestMapping("/topicSend") public String topicSend(String routingKey){ template.convertAndSend("topicExchange",routingKey,"Hello World"); return "yes"; }

3.在消费者创建3个接收者TopicReceiver

package com.lj.consumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@SuppressWarnings("all")@RabbitListener(queues = "topicQueueA")@Slf4jpublic class TopicReceiverA { @RabbitHandler public void process(String msg){ log.warn("A接到"+msg); }}

 

 三.扇形交换机 1.建立FanoutConfig

package com.lj.provider.mq;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class FanoutConfig { @Bean public Queue fanoutQueueA(){ return new Queue("fanoutQueueA",true); } @Bean public Queue fanoutQueueB(){ return new Queue("fanoutQueueB",true); } @Bean public Queue fanoutQueueC(){ return new Queue("fanoutQueueC",true); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } @Bean public Binding fanoutBindingA(){ return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange()); } @Bean public Binding fanoutBindingB(){ return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange()); } @Bean public Binding fanoutBindingC(){ return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange()); }}

2.在controller加方法

@RequestMapping("/fanoutSend") public String fanoutSend(){ template.convertAndSend("fanoutExchange",null,"Hello World"); return "yes"; }

3.在消费者创建3个接收者FanoutReceiver

package com.lj.consumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@SuppressWarnings("all")@RabbitListener(queues = "fanoutQueueA")@Slf4jpublic class FanoutReceiverA { @RabbitHandler public void process(String msg){ log.warn("A接到"+msg); }}

 

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。