欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

学RabbitMQ看这一编文章就完事了

时间:2023-05-05

一、引言

   写了n天,还是没写完,后面还有分布式事务实战,下次一定写完~

   手下留情~

   多多指教~


二、目录

1.1 消息队列协议 

01.什么是协议 ?

02.网络协议三要素

03.AMQP协议 

 1.2 消息队列的持久化

 1.3 消息的分发策略

 1.3 消息队列高可用和高可靠

01.什么是高可用和高可靠 ?

02、集群模式1---Master-slave 主从共享数据的部署方式

03.集群模式2 - Master  - slave 主从同步部署方式

03.集群模式3 - 多主集群同步部署模式

 1.4 RabbitMQ安装和部署

1.5 RabbitMQ的核心组成部分 

核心概念:

RabbitMQ的运行流程

RabbitMQ支持消息分发的模式

1.简单模式功能实现

2.工作队列 Work Queues

3.发布订阅模式

4.路由模式Routing

5.主题模式

1.6 RabbitMQ高级--过期时间TTL

01.什么是过期时间 ?过期时间是干什么的呢 ?

02.应用场景:

03.代码实现:

1.7 RabbitMQ高级--死信队列

什么是死信队列 ?

消息变死信可能以下原因: 

实现思路:

核心代码实现: 


  

1.1 消息队列协议 

01.什么是协议 ?

 我们知道消息中间件负责数据的传递,存储,分发和消费三个部分组成,数据的存储和分发的过程中肯定要遵循某种约定俗称的规范,你是采用TCP/IP,UD协议还是其他的自己去构建等,而这些约定俗成的规范称之为"协议"。

所谓协议是指:

1.计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。

2.和一般的网络应用程序不同的是,它主要负责数据的接受和传递,所以性能比较高。

3.协议对数据格式和计算机之间交换数据都必须严格遵守规范。

02.网络协议三要素

列如 : http请求协议

a.语法:http规定了请求报文和响应报文的格式

b.语义:客户端主动发起请求称之为请求。(这是一种定义,同时你发起的是 post/get 请求)

c.时序:一个请求对应一个响应。(一定先有请求再有响应,这个时序)。

小结:

         协议是在TCP/IP协议基础之上构建的一种约定成俗的规范和机制、他的主要目的可以让客户端进行沟通和通讯。并且这种协议下规范必须具有持久性、高可靠性、高可用性能。

03.AMQP协议 

简单概述:

     AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

特点:

1.分布式事务支持

2.消息的持久化支持。

3.高性能和高可靠的消息处理优势。 

 1.2 消息队列的持久化

简单来说就是将内存中的数据写入磁盘,防止服务器重启数据丢失。使数据能永久保存。

 1.3 消息的分发策略

MQ消息队列角色:

    a.生产者:

    b.存储消息:

    c.消费者:

 主要有三种分发的策略:

发布订阅:

消息推送到消息队列,假如消息队列有100条数据,有三台服务器,每台服务器都会接收到100条数据。

轮询分发:

消息推送到消息队列,假如消息队列有100条数据,有三台服务器,不管服务器的性能如何,每台服务器都会接收到99条数据,剩下的一条随机分发给一台服务器。 * 尽量保证每台服务器得到的条数都一样。 

 公平分发:

消息推送到消息队列,假如消息队列有100条数据,有三台服务器,性能高的多获取数据。反之,性能低的服务器少获取数据。 

 1.3 消息队列高可用和高可靠

01.什么是高可用和高可靠 ?

高可用:在指产品在规定的条件和规定的时刻或者时间内处于可执行规定功能状态的能力.

高可靠:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错率及其低,称之为:"高可靠".

02、集群模式1---Master-slave 主从共享数据的部署方式

注意 :这里的数据库就是个消息存储,不是真实的数据库。

说明:生产者发送消息到master节点,所有都连接这个消息队列共享这块数据区域,master节点负责写入,一旦master节点挂掉,slave节点可以继续使用。从而形成 "高可用"。 

03.集群模式2 - Master  - slave 主从同步部署方式

说明:这种模式写入消息同样在Master节点上,但是主节会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类似。这样可以达到负载均衡的效果。 

         

03.集群模式3 - 多主集群同步部署模式

 说明:和上面的区别不是很大,但是它的写入可以往任意节点去写入。

 1.4 RabbitMQ安装和部署

 参考博客:

(20条消息) rabbitmq安装包部署erlang环境安装_奔跑的菜鸡-CSDN博客_rabbitmq 安装包

1.5 RabbitMQ的核心组成部分 

注意 :交换机(exChange)一定是有的,如果没有指定交换机,一定是使用的默认交换机(AMQP default)。 

核心概念:

Server : 又称为Broker,接受客户端连接,实现AMQP实体服务。安装rabbitmq-server.

Connection : 连接,应用程序与Broker 的网络连接 TCP/IP 三次握手和四次挥手.

Channel : 网络信道,几乎所有的操作都在Channel中进行,Channel是进行信息读写的通道,客户端可以多个Channel,每个Channel代表一个会话任务。

Message : 信息,服务与应用程序之间传送的数据,由于Properties 和 Body 组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则是消息体的内容。

Virtual host : 虚拟地址,由于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有多个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange.

Exchange : 交换机,接受消息,根据路由键发送消息绑定的队列。(不具备消息存储的能力).

Bindings : Exchange 和 Queue 之间的虚拟连接,Biding中可以保护多个 routing key。

Routing key :是一个路由规则,虚拟机可以用来指定发送消息的队列。

Queue :  队列,也可以成为Message Queue,消息队列,保存消息并将他们发送给消费者.

RabbitMQ的运行流程

RabbitMQ支持消息分发的模式

1.简单模式功能实现

注:P 代表生产者 ,C 代表消费者,中间代表的是队列。

生产者

生产者实现思路:

1.创建连接工厂ConnectionFactory(),往里面设置服务器ip,端口号,用户名和密码,还有虚拟机节点。

2.创建Connection连接对象

3.通过连接对象获取通道channel

4.使用channel创建队列queue

5.使用通道channel 向队列中发送消息

6.关闭通道和连接

代码:

public class Producer { public static void main(String[] args) { // 1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.70.97.167"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2.创建Connection connection = factory.newConnection("生产者"); // 3.通过连接获取通道Channel channel = connection.createChannel(); // 4.通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接受消息 String queueName = "队列12"; channel.queueDeclare(queueName,true,false,false,null); // 5.准备发送消息内容 String message = "Hello World!"; // 6.发送消息队列queue channel.basicPublish("",queueName,null,message.getBytes()); System.out.println("消息发送成功!!!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { // 7.关闭连接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8.关闭通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}

消费者

消费者实现思路:

1.创建连接工厂ConnectionFactory(),往里面设置服务器ip,端口号,用户名和密码,还有虚拟机节点。

2.创建Connection连接对象

3.通过连接对象获取通道channel

4.使用通道Channel创建的队列queue

5.创建消费者监听队列,读取消息.

6.关闭通道和连接

代码 

public class Consumer { public static void main(String[] args) { // 1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.70.97.167"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2.创建Connection connection = factory.newConnection("生产者"); // 3.通过连接获取通道Channel channel = connection.createChannel(); // 4.使用通道Channel创建的队列queue String queueName = "队列1"; // 5.创建消费者监听队列,读取消息. channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(" 接受的消息 " + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败~~~~~~"); } }); System.out.println("开始接收消息了!!"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { // 7.关闭连接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8.关闭通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }

2.工作队列 Work Queues

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
  主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配; 

a.轮询模式:一个消费者一条,按均分发。

代码演示: 

生产者 

public class Producer { public static void main(String[] args) { // 1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.70.97.167"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2.创建Connection connection = factory.newConnection("生产者"); // 3.通过连接获取通道Channel channel = connection.createChannel(); // 4.通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接受消息 String queueName = "wb"; channel.queueDeclare(queueName,true,false,false,null); // 5.准备发送消息内容 String message = "Hello World!"; // 6.发送消息队列queue for (int i = 0; i < 10 ; i++) { channel.basicPublish("",queueName,null,message.getBytes()); } System.out.println("消息发送成功!!!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { // 7.关闭连接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8.关闭通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}

消费者1 

public class Consumer1 { public static void main(String[] args) { // 1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.70.97.167"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2.从工厂中获取连接 connection = factory.newConnection("消费者work1"); // 3.通过连接获取通道Channel channel = connection.createChannel(); // 4.使用通道Channel创建的队列queue String queueName = "wb"; // 6: 定义接受消息的回调 Channel finalChannel = channel; // 5.创建消费者监听队列,读取消息. finalChannel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("work1接受的消息 " + new String(message.getBody(), "UTF-8")); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败~~~~~~"); } }); System.out.println("work1开始接收消息了!!"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { // 7.关闭连接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8.关闭通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}

 消费者2

public class Consumer2 { public static void main(String[] args) { // 1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.70.97.167"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2.从工厂中获取连接 connection = factory.newConnection("消费者work2"); // 3.通过连接获取通道Channel channel = connection.createChannel(); // 4.使用通道Channel创建的队列queue String queueName = "wb"; // 6: 定义接受消息的回调 Channel finalChannel = channel; // 5.创建消费者监听队列,读取消息. finalChannel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("work2接受的消息 " + new String(message.getBody(), "UTF-8")); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败~~~~~~"); } }); System.out.println("work2开始接收消息了!!"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { // 7.关闭连接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8.关闭通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}

结果:

b、公平分发:根据消费者的能力分发,消费能力快的多分发,消费能力慢的少分发,能者多劳。

注意:生产者代码都一样,这里就不重复展示了,可以直接复制上面的即可。 

消费者1

public class FairConsumer1 { public static void main(String[] args) { // 1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.70.97.167"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2.从工厂中获取连接 connection = factory.newConnection("消费者work1"); // 3.通过连接获取通道Channel channel = connection.createChannel(); // 4.使用通道Channel创建的队列queue String queueName = "qq"; // 6: 定义接受消息的回调 Channel finalChannel = channel; // 公平分发指标一定要定义出来 速度快的消费者会多执行 每次执行一次 根据磁盘Cpu内存去决定 finalChannel.basicQos(1); // 5.创建消费者监听队列,读取消息. finalChannel.basicConsume(queueName, false, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { System.out.println("work1接受的消息 " + new String(message.getBody(), "UTF-8")); Thread.sleep(2000); // 消费者开启手动应答 finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败~~~~~~"); } }); System.out.println("work1开始接收消息了!!"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { // 7.关闭连接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8.关闭通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}

消费者2

public class FairConsumer2 { public static void main(String[] args) { // 1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.70.97.167"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2.从工厂中获取连接 connection = factory.newConnection("消费者work2"); // 3.通过连接获取通道Channel channel = connection.createChannel(); // 4.使用通道Channel创建的队列queue String queueName = "qq"; // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); // 5.创建消费者监听队列,读取消息. finalChannel.basicConsume(queueName, false, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { System.out.println("work2接受的消息 " + new String(message.getBody(), "UTF-8")); Thread.sleep(1000); // 消费者开启手动应答 finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败~~~~~~"); } }); System.out.println("work2开始接收消息了!!"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { // 7.关闭连接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8.关闭通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}

 结果:

 小结:

在上面的结果当中,work2处理了更多的消息,实现了我们的公平分发模式。

公平分发:消费者一定要设置成手动应答。

消费一次接受一条消息:

finalChannel.basicQos(1);

关闭自动应答:

finalChannel.basicConsume(queueName, false,cosumer);

消费者开启手动应答代码: finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);

轮询模式:自动应答。

总结

(1)当队列消息较多时,我们通常会开启多个消费者处理消息;公平分发和轮询分发都是我们经常

使用的模式。

(2)轮询分发的主要思想是  "按均分配",不考虑消费者的处理能力,所有消费者均分;这种情况下,处理能力弱的服务器,一直都在处理消息,而处理能力强的服务器,在处理完消息后,处于空闲状态;

(3)公平分发的主要思想是"能者多劳",按需分配,能力强干的多。

注意:下面讲解的发布订阅模式,路由模式,主题模式。这些都是直由生产者推送到队列的,然后分发给消费者,我们结合Springboot来说明。

3.发布订阅模式

fanout: 发布模式订阅,给每一个队列都发送消息。C1和C2都会收到消息。 

图形界面实现:

1.添加个交换机

2.添加新的队列,需要多个队列进行操作

3.将交换机与队列进行绑定

4.发送消息 

代码实现: 

1.声明队列绑定交换机

@Configurationpublic class FanoutExChangebinding { // 声明注册交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout_Order_exChange",true,false); } // 声明队列 @Bean public Queue qqQueue(){ return new Queue("qq",true); } @Bean public Queue wxQueue(){ return new Queue("wx",true); } @Bean public Queue wbQueue(){ return new Queue("wb",true); } // 绑定关系 @Bean public Binding qqBingding(){ return BindingBuilder.bind(qqQueue()).to(fanoutExchange()); } @Bean public Binding wxBingding(){ return BindingBuilder.bind(wxQueue()).to(fanoutExchange()); } @Bean public Binding wbBingding(){ return BindingBuilder.bind(wbQueue()).to(fanoutExchange()); }}

2.发送消息

@Servicepublic class sendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessageService(String orderId){ rabbitTemplate.convertAndSend("fanout_Order_exChange","",orderId); }}

3.Test

@SpringBootTestclass SpirijngbootRabbitmqApplicationTests { @Autowired private sendMessageService sendMessageService; @Test void fanoutTest() { String orderId = UUID.randomUUID().toString(); sendMessageService.sendMessageService(orderId); System.out.println(" 发送成功!! " + orderId); }}

 消息已经发送到队列了,那我们怎么去消费掉呢 ?别急,看下面代码~

结果:

4.路由模式Routing

说明:direct是交换机的类型

 direct: 路由模式,简单点来说,就是在fanout模式上根据路由key添加个条件筛选。

1.添加交换机

2.绑定队列并添加路由key,绑定后我们发送消息就可以不填写队列名直接通过路由key去发送了

只有添加了sms路由key的能收到消息.

代码实现: 

1.声明队列绑定交换机

@Configurationpublic class DirectExChangebinding { // 声明注册交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange("direct_Order_exChange",true,false); } // 声明队列 @Bean public Queue qqQueueDirect(){ return new Queue("qq",true); } @Bean public Queue wxQueueDirect(){ return new Queue("wx",true); } @Bean public Queue wbQueueDirect(){ return new Queue("wb",true); } // 绑定关系 @Bean public Binding qqBingdingDirect(){ return BindingBuilder.bind(qqQueueDirect()).to(directExchange()).with("11"); } @Bean public Binding wxBingdingDirect(){ return BindingBuilder.bind(wxQueueDirect()).to(directExchange()).with("11"); } @Bean public Binding wbBingdingDirect(){ return BindingBuilder.bind(wbQueueDirect()).to(directExchange()).with("12"); }}

2.发送消息

@Servicepublic class sendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendDirectMessageService(String orderId){ rabbitTemplate.convertAndSend("direct_Order_exChange","11",orderId); }}

3.Test

@SpringBootTestclass SpirijngbootRabbitmqApplicationTests { @Autowired private sendMessageService sendMessageService; @Test void directTest() { String orderId = UUID.randomUUID().toString(); sendMessageService.sendDirectMessageService(orderId); System.out.println(" 发送成功!! " + orderId); }}

结果:

5.主题模式

说明:Topic也是交换机类型。

Topic: 主题模式,简单点来说,就是在fanout模式上根据路由key添加个模糊匹配。(#:代表一个或多个层级,也可以没有,有 "." 表示每一级。*:代表一个层级,必须要有一级)。

代码实现:

1.声明邦列绑定交换机

@Configurationpublic class TopicExChangebinding { // 声明注册交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange("topic_Order_exChange",true,false); } // 声明队列 @Bean public Queue qqQueueTopic(){ return new Queue("qq",true); } @Bean public Queue wxQueueTopic(){ return new Queue("wx",true); } @Bean public Queue wbQueueTopic(){ return new Queue("wb",true); } // 绑定关系 @Bean public Binding qqBingdingTopic(){ return BindingBuilder.bind(qqQueueTopic()).to(topicExchange()).with("com.#"); } @Bean public Binding wxBingdingTopic(){ return BindingBuilder.bind(wxQueueTopic()).to(topicExchange()).with("#.order.#"); } @Bean public Binding wbBingdingTopic(){ return BindingBuilder.bind(wbQueueTopic()).to(topicExchange()).with("*.com"); }}

2.发送消息

注意:因为这里的Routing key 是 a.com,在我们上面匹配的话,只会匹配到一个队列wb

@Servicepublic class sendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendTopicMessageService(String orderId){ rabbitTemplate.convertAndSend("topic_Order_exChange","a.com",orderId); }}

3.Test

@SpringBootTestclass SpirijngbootRabbitmqApplicationTests { @Autowired private sendMessageService sendMessageService; @Test void TopicTest(){ String orderId = UUID.randomUUID().toString(); sendMessageService.sendTopicMessageService(orderId); System.out.println(" 发送成功!! " + orderId); }}

结果:

小结:

rabbitmq发送消息一定有一个交换机,如果没指定,绑定的就是默认的交换机。

 

1.6 RabbitMQ高级--过期时间TTL

01.什么是过期时间 ?过期时间是干什么的呢 ?

过期时间TTL表示可以对消息设置预期时间,在这个时间段内消息队列中的消息可以被消费者接受获取,过了这个时间段内消息就会被自动删除。

02.应用场景:

  比如:订单下了订单未支付,10分钟内自动取消。

实现方式:   

 1.第一种方法是通过队列属性设置,队列中所有的消息都有相同的过期时间。

 2.第二种方法是对消息单独设置,每条消息的TTL(过期时间)不同。

重点 : 如果上面两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个值为准。消息在队列的生存时间一旦超过设置的TTL值,就会被投递到死信队列(简称为: dead message),消费者将再无法接受到这条消息。

说得好不如代码写得好,我们直接上代码~ 

03.代码实现:

第一种:

给队列设置过期时间。

 在声明队列的时候这样去设置,记得仔细看看代码中的注释~

@Bean public Queue ttl_Queue(){ Map map = new HashMap<>(); map.put("x-message-ttl",5000); return new Queue("ttl_Queue",true,false,false,map); }

图形化界面实现:

设置成功: 

第二种:

给消息设置过期时间。

在给队列发送消息的时候,去设置每条消息的过期时间。(单位:毫秒)

@Servicepublic class sendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendTTl2MessageService(String orderId){ // 给消息设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){ @Override public Message postProcessMessage(Message message) throws AmqpException { // 这里就是字符串 message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return null; } }; rabbitTemplate.convertAndSend("ttl_direct_exChange","11",orderId); }}

1.7 RabbitMQ高级--死信队列

什么是死信队列 ?

      DLX,全称为Dead-Letter-Exchange,可以称为 "死信交换机",也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message) 之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列称之为死信队列。

消息变死信可能以下原因: 

a.消息被拒绝

b.消息TTL过期

c.队列达到最大长度

实现思路:

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数(可以参考RabbitMQ之mandatory和immediate)的功能。

核心代码实现: 

b.消息TTL过期

@Bean public Queue ttl_Queue(){ Map map = new HashMap<>(); // 1.设置过期时间 map.put("x-message-ttl",50000); // 绑定死信队列交换机 map.put("x-dead-letter-exchange","dead_Direct_exChange"); return new Queue("ttl_Queue",true,false,false,map); }

 c.队列达到最大长度

@Bean public Queue ttl_Queue(){ Map map = new HashMap<>(); // 2.队列超过最大长度 map.put("x-max-length",5); // 绑定死信队列交换机 map.put("x-dead-letter-exchange","dead_Direct_exChange"); return new Queue("ttl_Queue",true,false,false,map); }

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。