通常谈到队列服务,会有三个概念:发消息者、队列、收消息者,RabbitMq在这个基础概念之上,多做了一层抽象,在发送消息和接收消息之间加入了交换器(Exchange)。发送消息者跟收消息者没有直接联系,发消息者把消息给交换器,交换器根据调度策略把消息再给队列
虚拟主机:一个虚拟机持有一组交换机、队列和绑定。再rabbitmq中,只能以虚拟主机的粒度进行控制权限,如果想要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建虚拟主机交换机:Exchange用于转发消息,它不会做存储,如果没有Queue bind绑定到Exchange的话,会直接丢弃Producer发送过来的消息。消息到交换机需要通过 路由键 进行转发到具体的队列中绑定:交换机需要和队列相绑定,多对多关系 2、交换机
交换机不存储消息,只做消息的转换,在启用ack模式后,交换机找不到队列会返回错误
Direct:直接交换机,”先匹配,再投送“,即在绑定时设置一个routing_key,消息的routing_key匹配时才会被交换机进行投送Topic:按规则转发消息Headers:设置 header attribute参数类型的交换机Fanout:转发消息到所有绑定队列 2.1 Direct直接交换机
rabbitmq默认交换机,采用全文匹配的方式,例如:T - D1 绑定ey为 QD1,如果发送消息中的路由键匹配到QD1,就会进行转发
2.2 Topic交换机Topic Exchange转发消息主要是根据通配符。这种交换机会和队列绑定一种格式,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息
路由键必须是字符串,用句号(.) 隔开,例如:agreements.us路由匹配模式中的(*),主要用于匹配路由键指定位置的单词。例如:agreements.*.b,就会去匹配第一个arrements,以及第三个 b路由匹配模式中的(#), 表示多个单词 。例如: agreements.b.# 只要路由键 agreements.b 开头都会匹配 2.3 Headers交换机
相比较direct和topic固定的使用routing_key,headers则是一个自定义匹配规则类型,在队列与交换器绑定时会定义一组规则,只要消息中的键值对有一对匹配或者全部匹配时,就会被投送到对应队列
2.4 Fanout扇形交换机Fanout Exchange消息广播的模式,不管路由键或者是路由模式,会把消息发送给绑定给它的全部队列,如果配置了routing_key 会被忽略
3、Spring Boot集成Rabbitmq 3.1 引入依赖包使用spring封装的amqp进行操作
Queue:抽象队列对象,
durable:是否持久化,如果为false,rabbitmq启动时就会自动删除队列exclusive:声明了exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除autoDelete:所有客户端都断开连接时自动删除(用于临时使用) MessageExchangeBinding 3.3 消息队列
@Configurationpublic class QueueConfiguration { @Bean public Queue queue() { return new Queue(RabbitMqConst.HELLO_QUEUE_NAME); }}
3.4 生产者@Componentpublic class SimpleSender { @Resource private AmqpTemplate amqpTemplate; public void send() { String context = "helloMQ:" + new Date().getTime(); amqpTemplate.convertAndSend(RabbitMqConst.HELLO_QUEUE_NAME, context); }}
3.5 消费者@Component@RabbitListener(queues = RabbitMqConst.HELLO_QUEUE_NAME)public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("接收到数据:" + hello); }}
3.6 测试方法@SpringBootTest(classes = SpringRabbitmqDemoApplication.class)@RunWith(SpringJUnit4ClassRunner.class)public class SpringRabbitmqDemoApplicationTests { @Resource private SimpleSender simpleSender; @Test public void hello() { simpleSender.send(); }}
4、交换机测试注意:默认所有创建的队列以及交换机都会持久化,测试代码时可以设置为false,autoDelete可以设置为true
4.1 Direct交换机@Configurationpublic class QueueConfiguration { @Bean public Queue queue() { return new Queue(RabbitMqConst.HELLO_QUEUE_NAME); } @Bean public DirectExchange directExchange() { return new DirectExchange(RabbitMqConst.DIRECT_EXCHANGE_NAME); } @Bean public Binding bindingToDirect() { return BindingBuilder.bind(queue()).to(directExchange()).with("hello"); }}
@Componentpublic class SimpleSender { @Resource private AmqpTemplate amqpTemplate; public void sendToDirect() { String context = "sendDirect:" + new Date().getTime(); //注意第二个参数需要跟Binding中指定的routingkey对应上 amqpTemplate.convertAndSend(RabbitMqConst.DIRECT_EXCHANGE_NAME, "hello", context); }}
4.2 Topic交换机@Configurationpublic class QueueConfiguration { @Bean public Queue queue() { // 构造方法接收三个参数:durable(是否持久化), exclusive(是否是特殊化,最对第一个连接可见,断开后就删除), autoDelete(是否自动删除) return new Queue(RabbitMqConst.HELLO_QUEUE_NAME, false, false, false); } @Bean public TopicExchange topicExchange() { return new TopicExchange(RabbitMqConst.TOPIC_EXCHANGE_NAME); } @Bean public Binding bindingToTopic() { return BindingBuilder.bind(queue()).to(topicExchange()).with("hello.*(#).world"); }}
4.3 Fanout交换机@Configurationpublic class QueueConfiguration { @Bean public Queue queue() { // 构造方法接收三个参数:durable(是否持久化), exclusive(是否是特殊化,最对第一个连接可见,断开后就删除), autoDelete(是否自动删除) return new Queue(RabbitMqConst.HELLO_QUEUE_NAME, false, false, false); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitMqConst.FANOUT_EXCHANGE_NAME); } @Bean public Binding bindingToFanout() { return BindingBuilder.bind(queue()).to(fanoutExchange()); }}public void sendToFanout() { String context = "sendFanout:" + new Date().getTime(); //发送时,如果指定了routingkey也会失效 amqpTemplate.convertAndSend(RabbitMqConst.FANOUT_EXCHANGE_NAME, "hello.1.world", context); }
4.4 Headers交换机@Configurationpublic class QueueConfiguration { @Bean public Queue queue() { // 构造方法接收三个参数:durable(是否持久化), exclusive(是否是特殊化,最对第一个连接可见,断开后就删除), autoDelete(是否自动删除) return new Queue(RabbitMqConst.HELLO_QUEUE_NAME, false, false, false); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange(RabbitMqConst.HEADERS_EXCHANGE_NAME); } @Bean public Binding bindingToHeaders() { //匹配单个KV值 Binding single = BindingBuilder.bind(queue()).to(headersExchange()).where("msg").matches("hello"); //多个KV值进行匹配 Map
whereAny:匹配多个KV中的任意一个whereAll:所有的KV都必须匹配到 5、消息确认机制
spring: rabbitmq: host: 192.168.92.190 port: 5672 username: root password: root publisher-/confirm/is: true # 消息发送到交换机是否成功的回调函数 publisher-returns: true # 消息发送到对应队列失败调用的回调 virtual-host: / listener: simple: acknowledge-mode: manual # 采用手动答应 concurrency: 1 # 指定最小的消费者数量 max-concurrency: 1 # 指定最大的消费者数量 retry: enabled: true #开启消息重试
5.1 生产者@Componentpublic class SimpleSender { @Resource private AmqpTemplate amqpTemplate; public void send() { RabbitTemplate rabbitTemplate = (RabbitTemplate) this.amqpTemplate; //交换机投递到对应队列失败时,根据当前Mandatory 判断是否需要调用 ReturnCallback,配置文件中需要配置publisher-returns:true rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息成功发送到exchange...."); } else { System.out.println("消息发送exchange失败,原因:"+ cause); } });//交换机转发消息到对应队列失败时调用 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); System.out.printf("消息:%s,发送失败,应答码:%s,原因:%s,交换机:%s,路由键:%s%n", correlationId, replyCode, replyText, exchange, routingKey); }); String context = "helloMQ:" + new Date().getTime(); this.amqpTemplate.convertAndSend(RabbitMqConst.DIRECT_EXCHANGE_NAME, "hello", context); }}
/confirm/iCallback:消息确认函数,当前函数主要的目的是,消息是否发送到交换机时回调的函数ReturnCallback:消息返回回调函数,当前消息成功投递到交换机,但是没有投递到对应的队列时调用 5.2 消费者
@RabbitHandler @RabbitListener(queues = RabbitMqConst.HELLO_QUEUE_NAME) public void process(Message message, Channel channel) throws IOException { System.out.println("接收到数据:" + new String(message.getBody())); //消费者手动确认消息,如果没有确认,broker会将数据存储后继续发送 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
6、死信队列 DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到 x-dead-letter-exchange 对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列
消息被拒绝了 Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false消息过期了队列达到最大长度 6.1 队列配置
@Configurationpublic class DeadQueueConfiguration { @Bean public Queue queue() { return QueueBuilder .nonDurable(RabbitMqConst.TEST_DLX_QUEUE_NAME) .withArgument("x-dead-letter-exchange", RabbitMqConst.DLX_EXCHANGE) .build(); } @Bean public DirectExchange directExchange() { return new DirectExchange(RabbitMqConst.TEST_DLX_EXCHANGE); } @Bean public Binding bindingToDirect() { return BindingBuilder.bind(queue()).to(directExchange()).with("hello"); } @Bean public Queue deadQueue() { return QueueBuilder .nonDurable(RabbitMqConst.DLX_QUEUE_NAME) .build(); } @Bean public Exchange deadExchange() { return ExchangeBuilder .topicExchange(RabbitMqConst.DLX_EXCHANGE) .build(); } @Bean public Binding bindingToDead() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("#").noargs(); }}
@Componentpublic class DeadSender { @Resource private AmqpTemplate amqpTemplate; public void send() { String context = "dead msg:" + new Date().getTime(); MessageProperties properties = MessagePropertiesBuilder .newInstance() //设置过期时间为10秒 .setExpiration("10000") .build(); Message message = MessageBuilder .withBody(context.getBytes()) .andProperties(properties) .build(); //发送到直接交换机上 amqpTemplate.convertAndSend(RabbitMqConst.TEST_DLX_EXCHANGE, "hello", message); }}
test_dlx_queue:为正常队列,发送消息时,指定10秒钟过期,因为没有开消费者,所以就自动转发到了dlx_queue上面
7、延时队列 7.1 下载插件wget https://objects.githubusercontent.com/github-production-release-asset-2e65be/924551/bdcc937b-781e-4952-b2e2-a66c8aa94336?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20220215%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20220215T015044Z&X-Amz-Expires=300&X-Amz-Signature=5f16f24403f3cba33a2e4a99f4eeef41bf3cb5ca5c05bd59468e4f5fd9e8b8f3&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=924551&response-content-disposition=attachment%3B%20filename%3Drabbitmq-server-3.9.8-1.el7.src.rpm&response-content-type=application%2Foctet-stream
找到服务安装路径下面的plugins路径,将下载的 rabbitmq_delayed_message_exchange 插件拷贝进去
mv rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/
启动:
7.2 延迟队列配置rabbitmq-plugins enable rabbitmq_delayed_message_exchange
@Configurationpublic class DelayConfiguration { @Bean public Exchange delayExchange() { Map
@Componentpublic class DelaySender { @Resource private AmqpTemplate amqpTemplate; public void send(Integer delay) { String context = "delay msg:" + new Date().getTime(); MessageProperties properties = MessagePropertiesBuilder .newInstance() .build(); Message message = MessageBuilder .withBody(context.getBytes()) .andProperties(properties) .build(); //发送到直接交换机上 amqpTemplate.convertAndSend(RabbitMqConst.DELAY_EXCHANGE, "delay.test", message, p -> { //进行消息拦截处理,给消息设置延时时间 p.getMessageProperties().setDelay(delay); return p; }); }}
7.3 消费者@Componentpublic class DelayReceiver { @RabbitHandler @RabbitListener(queues = RabbitMqConst.DELAY_QUEUE) public void process(Message message, Channel channel) throws IOException { System.out.println("监听到延时数据,进行处理:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }}