生产者将消息发送到Exchange,有Exchange再路由到一个或多个队列中
路由键 (RoutingKey)
生产者将信息发送给交换机时会指定RoutingKey指定路由规则
绑定键 (BindingKey)
通过绑定键将交换机和队列关联起来,这样RabbitMQ就知道如何正确的将消息路由到队列
关系小结
交换机类型 直连交换机:Direct exchange 主题交换机:Topic exchange 扇形交换机:Fanout exchange 首部交换机:Headers exchange 默认交换机: Dead Letter Exchange (死信交换机)生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定
交换机的属性
一.直连交换机 1.新建DirectConfig类
创建队列
创建交换机
行交换机和队列的绑定:设置BindingKey
package com.lj.provider.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class DirectConfig { @Bean public Queue direcrQueueA(){ return new Queue("direcrQueueA",true); } @Bean public Queue direcrQueueB(){ return new Queue("direcrQueueB",true); } @Bean public Queue direcrQueueC(){ return new Queue("direcrQueueC",true); } @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } public Binding BindingA(){ return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("aa"); } public Binding BindingB(){ return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("bb"); } public Binding BindingC(){ return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("cc"); }}
2.建一个ProviderController发送信息package com.lj.provider;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@SuppressWarnings("all")public class ProviderController { @Autowired private RabbitTemplate template; @RequestMapping("/directSend") public String DirectSend(String routingKey){ template.convertAndSend("directExchange",routingKey,"Hello World"); return "yes"; }}
这里面是没有Queue的
运行成功
3.建立3个DirectReceiver类接收消息package com.lj.consumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@SuppressWarnings("all")@RabbitListener(queues = "direcrQueueA")@Slf4jpublic class DirectReceiverA { @RabbitHandler public void process(String msg){ log.warn("A接到"+msg); }}
二.主题交换机 1.在生产者新建 TopicConfig
package com.lj.provider.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class TopicConfig { public final static String KEY_A="*.orange.*"; public final static String KEY_B="*.*.rabbit"; public final static String KEY_C="lazy.#"; @Bean public Queue topicQueueA(){ return new Queue("topicQueueA",true); } @Bean public Queue topicQueueB(){ return new Queue("topicQueueB",true); } @Bean public Queue topicQueueC(){ return new Queue("topicQueueC",true); } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding topicBindingA(){ return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A); } @Bean public Binding topicBindingB(){ return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B); } @Bean public Binding topicBindingC(){ return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C); }}
2.在 Controller 里面加一个方法@RequestMapping("/topicSend") public String topicSend(String routingKey){ template.convertAndSend("topicExchange",routingKey,"Hello World"); return "yes"; }
3.在消费者创建3个接收者TopicReceiverpackage com.lj.consumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@SuppressWarnings("all")@RabbitListener(queues = "topicQueueA")@Slf4jpublic class TopicReceiverA { @RabbitHandler public void process(String msg){ log.warn("A接到"+msg); }}
三.扇形交换机 1.建立FanoutConfig
package com.lj.provider.mq;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class FanoutConfig { @Bean public Queue fanoutQueueA(){ return new Queue("fanoutQueueA",true); } @Bean public Queue fanoutQueueB(){ return new Queue("fanoutQueueB",true); } @Bean public Queue fanoutQueueC(){ return new Queue("fanoutQueueC",true); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } @Bean public Binding fanoutBindingA(){ return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange()); } @Bean public Binding fanoutBindingB(){ return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange()); } @Bean public Binding fanoutBindingC(){ return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange()); }}
2.在controller加方法@RequestMapping("/fanoutSend") public String fanoutSend(){ template.convertAndSend("fanoutExchange",null,"Hello World"); return "yes"; }
3.在消费者创建3个接收者FanoutReceiverpackage com.lj.consumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@SuppressWarnings("all")@RabbitListener(queues = "fanoutQueueA")@Slf4jpublic class FanoutReceiverA { @RabbitHandler public void process(String msg){ log.warn("A接到"+msg); }}