在rabbitmq中,生产者发信息不会直接将信息投递到队列中,而是先将信息投递到交换机中,在交换机转发在具体的队列,队列再将信息推送或者拉取消费者进行消费
路由键(Routingkey)
生产者将信息发送给交换机的时候 会指定Routingkey指定路由规则
绑定键(Bindingkey)
二,交换机类型通过绑定键将交换机与队列关联起来,这样rabbtamq就知道如何正确的将信息路由到队列
直接交换机:Direct exchange
直接交换机的路由算法非常简单:将信息推送到binding key与改信息的routing key相同队列
三,主题交换机:Topic exchange直接交换机x上绑定了两个队列
第一个队列绑定了绑定键orange 第二个队列有两个绑定键一个black和green
直接交换机的特点:
直连交换机的routing_key方案非常简单,如果我们希望一条消息发送多个队列,那么这个交换机需要绑定上非常多的routing_key
假设每个交换机上都绑定一堆的routing_key连接到各个队列上,那么消息管理就会异常困难
主题交换机的特点:
例题
扇形交换机:Fanout exchange首部交换机:Headers exchange 默认交换机:扇形交换机时最基本的交换机类型 它所能做的事情非常简单 广播消息
扇形交换机会把能接到的消息全部发送给绑定在直接身上的队列 因为广播不需要‘思考’
所以扇形交换机处理的消息的速度是所有交换机类型里最快的
死信交换机 dead letter exchange实际上是一个由rabbitmq预先声明好的名字为空字符串的直连交换机(direct exchange)
它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定在默认的交换机上,绑定的路由键(routing key)名称与队列名称相同
rabbitmq作为一个高级消息中间件 提出了死信交换器的概念
这种交换器专门处理死了的消息(被拒绝可以重新投递的信息不能算死的)
订单的超时处理 交换机的属性消息变成死信一般是以下三种情况:
消息被拒绝 并且设置requeue参数false
消息过期(默认情况下rabbit中的消息不过期 但是可以设置队列的过期时间和消息的过期时间以达到消息过期的效果)
队列达到最大长度(一般当设置了最大队列长度或大小并达到最大值时)
当满足这三种情况,消息会变成死信消息 并通过死信交换机投递到相应的队列中
我们只需要监听相应队列 就可以对死信消息进行最后的处理
①直连交换机(生产者name:交换机名称
type:交换机类型,direct,topic,tanout,headers
durability:是否需要持久化,如果持久性 则rabbitmq重启后 交换机还存在
auto delete:当最后一个绑定到exchange上的队列删除后 自动删除该exchange
internal:当前exchange是否用于rabbitmq内部使用 默认为false
arguments:扩展参数 用于扩展mop协议定制化使用
Directconfig.java(生产者)
package com.hmf.producer.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class Directconfig { //1.创建队列 @Bean public Queue directQueueA(){ return new Queue("directQueueA",true); } @Bean public Queue directQueueB(){ return new Queue("directQueueB",true); } @Bean public Queue directQueueC(){ return new Queue("directQueueC",true); } //2.创建交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } //3.交换机和队列的绑定:设置banding @Bean public Binding bindingA(){ return BindingBuilder.bind(directQueueA()).to(directExchange()).with("a"); } @Bean public Binding bindingB(){ return BindingBuilder.bind(directQueueB()).to(directExchange()).with("b"); } @Bean public Binding bindingC(){ return BindingBuilder.bind(directQueueC()).to(directExchange()).with("c"); }}
ProducerController.java
package com.hmf.producer;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@SuppressWarnings("all")public class ProducerController { @Autowired private RabbitTemplate template; @RequestMapping("/directSend") public String directSend(String routingKey){ template.convertAndSend("directExchange",routingKey,"Hello"); return "okok"; }}
结果
访问成功
①直连交换机(消费者DirectReceiverC.java(消费者)
package com.hmf.cunsumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component//固定 跟着容器一起启动@SuppressWarnings("/all")//消息队列的监听器@RabbitListener(queues = "directQueueC")@Slf4jpublic class DirectReceiverC { @RabbitHandler public void process(String message){ log.info("C接到"+message); }}
访问成功
②主题交换机(生产者TopicConfig.java(生产者
package com.hmf.producer.mq;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class TopicConfig { private static String Key_A="*.orange.*"; private static String Key_B="*.*.rabbit"; private static String Key_C="lazy.#"; //1.创建队列 @Bean public Queue topicQueueA(){ return new Queue("topicQueueA",true); } @Bean public Queue topicQueueB(){ return new Queue("topicQueueB",true); } @Bean public Queue topicQueueC(){ return new Queue("topicQueueC",true); } //2.创建交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } //3.交换机和队列的绑定:设置banding @Bean public Binding topicBindingA(){ return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(Key_A); } @Bean public Binding topicBindingB(){ return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(Key_B); } @Bean public Binding topicBindingC(){ return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(Key_C); }}
ProducerController.java
package com.hmf.producer;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@SuppressWarnings("all")public class ProducerController { @Autowired private RabbitTemplate template; @RequestMapping("/directSend") public String directSend(String routingKey){ template.convertAndSend("directExchange",routingKey,"Hello"); return "okok"; } @RequestMapping("/topicSend") public String topicSend(String routingKey){ template.convertAndSend("topicExchange",routingKey,"Hello"); return "okok"; }}
访问成功
②主题交换机(消费者
TopicReceiverA.java(消费者
TopicReceiverA.java
package com.hmf.cunsumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component//固定 跟着容器一起启动@SuppressWarnings("/all")//消息队列的监听器@RabbitListener(queues = "directQueueA")@Slf4jpublic class TopicReceiverA { @RabbitHandler public void process(String message){ log.warn("A接到"+message); }}
访问成功
③扇形交换机(生产者注:要绑定 不要键
Fanoutconfig.java
package com.hmf.producer.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@SuppressWarnings("all")public class Fanoutconfig { //1.创建队列 @Bean public Queue fanoutQueueA(){ return new Queue("fanoutQueueA",true); } @Bean public Queue fanoutQueueB(){ return new Queue("fanoutQueueB",true); } @Bean public Queue fanoutQueueC(){ return new Queue("fanoutQueueC",true); } //2.创建交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } //3.交换机和队列的绑定:设置banding @Bean public Binding fanoutbindingA(){ return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange()); } @Bean public Binding fanoutbindingB(){ return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange()); } @Bean public Binding fanoutbindingC(){ return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange()); }}
ProducerController.java
package com.hmf.producer;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@SuppressWarnings("all")public class ProducerController { @Autowired private RabbitTemplate template; @RequestMapping("/directSend") public String directSend(String routingKey){ template.convertAndSend("directExchange",routingKey,"Hello"); return "okok"; } @RequestMapping("/topicSend") public String topicSend(String routingKey){ template.convertAndSend("topicExchange",routingKey,"Hello"); return "okok"; } @RequestMapping("/fanoutSend") public String fanoutSend(){ //键写成了null 不然它直接进入到第二个参数 否则会报错 template.convertAndSend("fanoutExchange",null,"Hello"); return "okok"; }}
访问成功
③扇形交换机(消费者
FanoutReceiverA.java
package com.hmf.cunsumer.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component//固定 跟着容器一起启动@SuppressWarnings("/all")//消息队列的监听器@RabbitListener(queues = "directQueueA")@Slf4jpublic class FanoutReceiverA { @RabbitHandler public void process(String message){ log.error("A接到"+message); }}
访问成功
可以增加数据进行多个绑定
访问一下
因为绑定了两个一个a和b 使用两个都接收到了
如果要解绑
点击unbind就可以啦