欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

rabbitmq

时间:2023-05-02

1.功能分同步和异步

2.分布式同步调用涉及的问题

1.同步正常,同步时间长2.同步异常(调用过程出现错误) 时间更长,对错误的处理更复杂 (前提,出现错误,一定要尽快处理错误,调用尽快返回,不然后占用线程资源,导致整个调用链 不释放线程资源,导致系统雪崩) 同步的通病就是耗时,错误出现更耗时,而且对错误的处理也复杂 1.一旦出现错误(一定要以 响应时间 优先考虑)1.出现错误,可以先重试几次(若考虑时间可以不重试),若还不成功,将错误信息 存到 错误表中 尽快先响应出来,保证整个分布式调用的尽快返回 2.错误的种类1.重要的(例如,数据库的操作已经做了,其他功能错误,此时响应客户端,用户重新操作) 这种必须,涉及分布式事务回滚,然后响应客户端重新操作 这种不仅 对操作复杂,而且耗时 2.不重要的,再重试几次还不成功,错误信息 存 错误表后,就可以直接返回了,不用回滚用户得到提示再次操作,操作不复杂,但是耗时

3.分布式异步调用

1.若,一个调用链的,几次调用没有必然的联系(下一个调用,必须用到上一次调用返回的数据) 那可以异步调用 2.异步调用(异步调用的时机问题) 例子:一个调用链,A将数据存到数据库,B将数据存到Es,C重写静态页面 1.一个调用链,直接异步调用(A,B,C在这个调用链中,直接异步调用,ABC都重要,都得立即执行)例如:A将数据存到数据库,B将数据存到 es中,C重写静态页面,再这个调用链中A将数据添加好后,直接响应页面,B,C异步调用不用响应页面这是三次 异步调用,在同一时刻一起执行(只是异步调用的时机问题)2.(不再同一时刻异步调用,即有的功能不那么重要,不用立即做)例如:A,B(调用没有必然联系),A是需要即刻执行并响应页面的,而B什么时候执行都没有关系(例如B是,将数据添加到es,此次调用链,不做这个事,放在后面做也行)这时候,就涉及到,B如果后面某天单独处理(例如批处理),那数据怎么保存下来,B在后面可以拿到这个数据,进行存 es这个操作呢?可以将数据,存到redis , mq(专门做这个事情),然后B再之后,要异步执行的时候,从mq取数据即可

由上面可知,mq用在 异步执行,并且不在同一时刻的,但是在此次业务得做的操作,但是又不太重要的操作(可以放在之后的专门某个时间统一处理)这样的情况,用mq保存数据,方便 那个此刻可以不做,但是之后必须得做的操作,可以从mq中获取数据 使用redis存消息需要考虑的

1、这个公共的任务池,会不会宕机?会不会服务不可用?如何解决?2、你一定确信消息发送到任务池了吗?3、如果在向任务池发送任务失败该如何处理?4、如果重试的时候发送成功了,但是实际上发送了多次,更新倒排索引服务和更新静态页面服务会不会重复执行?5、如果重复执行,最终结果会不会不一样?

分布式 异步 通信模式(mq)

优点:系统间解耦并具有一定的可恢复性支持异构系统,下游通常可并发执行(多个消费者集群并发消费消息,不能重复消费消息,按顺序消费),系统具备弹性服务异步解耦流量削峰填谷(某时刻并发请求太高,先将消息存在mq中,后面再异步消费,保证系统快速度过高并发的情况) 缺点:消息中间件存在一些瓶颈和一致性问题,对于开发来讲不直观且不易调试,有额外成本。

使用异步消息模式需要注意的问题:

1、哪些业务需要同步处理,哪些业务可以异步处理?2、如何保证消息的安全?消息是否会丢失,是否会重复?3、请求的延迟如何能够减少?4、消息接收的顺序是否会影响到业务流程的正常执行?5、消息处理失败后是否需要重发?如果重发如何保证幂等性?6、幂等性:因消息处理失败,重新向mq中发数据,消费者再次消费(这种情况不管发生多少次,都不会影响最后的结果)例如:累加操作,消费者消费消息做累加,如果重新向mq中发送消息,消费者再次消费,会导致每次结果不一样,这就无法保证幂等性

消息中间件的功能

异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中间件。

消息中间件概念

消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。

消息中间件就是在通信的上下游之间截断:break it,Broker

然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统

异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中间件。

自定义消息中间件

1.BlockingQueue(阻塞队列)是java中常见的容器,在多线程编程中被广泛使用。当队列容器已满时生产者线程被阻塞,直到队列未满后才可以继续put;当队列容器为空时,消费者线程被阻塞,直至队列非空时才可以继续take。
我们用阻塞队列作为共享资源

public class Producer implements Runnable { private final BlockingQueue blockingQueue; public Producer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (true) { try { Thread.sleep(200); if (blockingQueue.remainingCapacity() > 0) { KouZhao kouZhao = new KouZhao(UUID.randomUUID().toString(), "N95"); blockingQueue.add(kouZhao); System.out.println("我在生产口罩,当前库存是:" + blockingQueue.size()); } else { System.out.println("我的仓库已经堆满了" + blockingQueue.size() + "个口罩,快来买口罩啊!"); } } catch (InterruptedException e) { e.printStackTrace(); } } } }

public class Consumer implements Runnable {private final BlockingQueue blockingQueue; public Consumer(BlockingQueue blockingQueue) {this.blockingQueue = blockingQueue; }@Overridepublic void run() {while (true) {try {Thread.sleep(100);long startTime = System.currentTimeMillis(); // 获取开始时间KouZhao kouZhao = blockingQueue.take();long endTime = System.currentTimeMillis(); // 获取结束时间System.out.println("我消费了口罩:" + kouZhao + ", 等到货时我阻 塞了" + (endTime - startTime) + "ms");} catch (InterruptedException e) { e.printStackTrace(); } } } }

现在重点是,如何在producer 和 comsumer 两个线程中 拥有共享资源

首先这两个run线程,现在能获得的资源,producer和consumer对象内部的blockingQueue,
因为虽然是开启了两个线程(run是线程标识),但是线程的第一个方法栈,也是调用的 这两个对象的,实例方法run(而实例方法里面是可以获得,对象的实例属性的)

等于 run线程(就是一个标识,一个栈空间) 和 run线程里的 producer的 实例run方法 的区别

所以 只要 producer和consumer对象内部的blockingQueue,指向的是同一个 blockingQueue 即可
那么只要在 创建 producer和consumer对象的时候,给他们传同一个blockingQueue 即可

public class App {public static void main(String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue<>(20);Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); //开启一个run线程(线程的标识是run),主方法栈是 producer的run方法new Thread(producer).start(); Thread.sleep(20000); //开启一个run线程(线程的标识是run),主方法栈是 producer的run方法new Thread(consumer).start(); } }

上述代码放到生产环境显然是不行的,比如没有集群,没有分布式,玩儿法太单一,不能满足企业
级应用的要求。。。

比如:
消息有没有持久化?
怎么确定消息一定能发送成功?
怎么确定消息一定能被消费成功?
高并发下的性能怎么样?
系统可靠吗?
有没有Pub/Sub模式?
有没有考虑过限流?
。。。

主流消息中间件及选型

1.作为消息队列,要具备以下几个特性:1、消息传输的可靠性:保证消息不会丢失。2、支持集群,包括横向扩展,单点故障都可以解决。3、性能要好,要能够满足业务的性能需求。2.吞吐量和延迟率的区别1.吞吐量是指,1s内 (或者说一次)中间件能处理的消息2.延迟率,是指响应率 简单来说,mq一次要积累多少消息是指的吞吐量,一次积累完成后,然后响应处理,这个时间是延迟率 通常来说吞吐量越大,延迟率越高,吞吐量越小,延迟率越低3.中间件支持的 消费手段:推/拉1.推:中间件消息队列中只要有消息,事件源将消息(事件对象)传给 事件源管理的 监听器(消费者)list,监听器 拿到这个事件对象,进行消费 2.拉:消费者,主动去 mq中 拉消息模式4.RabbitMQ(吞吐量低,延迟率也低)优点:1.轻量级,快速,部署使用方便2.支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路 由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。3.RabbitMQ的客户端支持大多数的编程语言。缺点:1.如果有大量消息堆积在队列中,性能会急剧下降2.RabbitMQ的性能在Kafka和RocketMQ中是最差的,每秒处理几万到几十万的消息。如果应 用要求高的性能,不要选择RabbitMQ。 3.RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。5.RocketMQ(吞吐量比较高,延迟率比较高)优点1.RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。性能,稳定性可可靠性没的说。2.java开发,阅读源代码、扩展、二次开发很方便。3.性能比RabbitMQ高一个数量级,每秒处理几十万的消息。(吞吐量高,延迟比rabbitmq高,但是不是特别高)缺点:跟周边系统的整合和兼容不是很好。6.Kafka(吞吐量极高,延迟极高)优点1.Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。2.跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持Kafk3.Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。4.Kafka是Scala和Java开发的,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的 性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万的消息5.如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。缺点但是由于是异步的和批处理的,延迟也会高,不适合电商场景。

消息中间件应用场景(就是来解决高并发写,涉及高并发写操作,我们首先想的就是异步调用,提高响应速率,尽快空出线程资源,而这最好的解决办法就是 调用链 采用异步调用,A尽快响应,B,C不重要的,后面再去异步调用即可)而,BC异步调用的消息,就存在mq中

总而言之 消息中间件,就是来解决 高并发写 产生的 异步调用 而 消息存在哪,可以再后来被 异步调用的服务获取到 的问题

1.系统应该如何应对 读高并发1.使用缓存策略将请求挡在上层中的缓存中2.能静态化的数据尽量做到静态化3.加入限流(比如对短时间之内来自某一个用户,某一个IP、某个设备的重复请求做丢弃处理)2.系统应该如何应对 写高并发mq异步批处理,流量削峰3.消息队列的作用:1.削去秒杀场景下的峰值写流量——流量削峰2.通过异步处理简化秒杀请求中的业务流程——异步处理3.解耦,实现秒杀系统模块之间松耦合——解耦其实这三个功能,mq在执行消息的读推的时候,就全部解决了,这三个功能只是从不同角度解释了mq的作用

消息中间件的 执行过程

消费者向mq中发送消息
mq消息队列有消息后
事件源(mq) 向事件源(mq)中注册的 消费者(监听器) 发送 事件对象(消息)
然后消费者(监听器)拿到消息消费即可

消费者的消费(监听器执行监听/消费方法) 是异步调用的(也就是说监听器拿到消息,异步执行它的消费方法)

消息中间件的注意事项

1.哪些业务必须同步处理,哪些业务可以异步处理1.同步处理1.调用链之间,前一个调用的结果,下一个调用需要用(即调用之间关系紧密)2.异步处理1.调用链之间,前一个调用的结果,下一个调用不需要用(即调用关系不大)2.在 调用关系不大的情况下,既可以同步也可以异步1.什么时候最好异步(可以异步的情况下,若并发量大,或者有些可以业务不重要,或者想要解耦)1.并发量大,A业务完成后,向 mq 中 写数据,然后响应,BC后面再异步从 mq 中取消息,完成业务逻辑2.并发量不大,但是BC业务对 用户来说不重要,可以后面异步处理,也可以提高响应效率3.想要将调用之间解耦合,前提是可以进行异步调用(即 调用之间关系不大)

消息中间件中存储消息的容器,有队列(点对点通信),有topic(发布订阅通信时使用),这只是 JMS 规范中的,rabbitmq中的就是 队列 rabbitmq采用 推(通知)的模式,即只要容器中有某条消息,mq(事件源) 就会发送给 对应(认识这条消息/监听这个事件对象的,一般都是通过 消息的 ID来认识的,就是说消费者只会消费 他认识的 ID的消息)消费者(监听器)的 handlerEvent(消息)的方法执行 消费消息,就是 观察者模式的 应用 生产者和消费者的 生产和消费消息 都是相互的 怎么理解

生产者<=> mq <=>消费者
首先生产者 向 mq中的 指定 destination 写消息(路由key - 交换机 -绑定key -queue)

mq 通知 (消费者 开启 channel 绑定 一个 queue)的消费者,消费消息

消费者消费完消息,通常都要 响应一个 响应消息给mq的指定的 destination 代表消费者已经消费完消息(路由key - 交换机 -绑定key -queue)

mq这时 又会 通知 (绑定 这个 queue)的生产者,消费消息

所以说,mq其实就是 一直在 不停的 执行观察模式,一直在 给 监听器的 方法传消息执行(监听器只执行 自己认的 消息/事件对象)

而生产者和消费者,都得生产和消费消息(只是目的不停,生产者是自动生产消息,被动消费消息,而消费者是主动消费消息,被动生产消息)

(这个消费者主动消费消息,不是说消费者去 主动 向 mq中 拉取消息,而是说,消费者本职工作就是消费消息,对于这个来说消费者是主动的,而被动生产消息是说,消费者的本职不是生产消息,是得给mq一个通知消息证明我消费者已经消费完消息了,对于这个方面是被动的) 生产者 同理

JMS经典模型(简单来说,操作mq的接口规范,每个mq厂商都得按这个java规范,设计实现类)

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。

JMS消息:(mq消息的 对应的 java实现)

消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。
报文头包括消息头字段和消息头属性。字段是JMS协议规定的字段,属性可以由用户按需添加。

JMS报文头全部字段

以上都是JMS规范,rabbitmq有更具体的实现,也加了很多其他的东西,不能把 JMS 和 rabbitmq的具体实现混淆,包括我们说的JMS 主题模式,rabbitmq中就是 queue,还是要以 rabbitmq为主,例如下面说的工作模式,JMS就2种,但是 rabbitmq 在这两种的基础上 有了很多 工作模式

rounting-key不是由 message设置的,即不是message中的,(它是,rabbitmq的 特别的 工作模式 中用到的,用来 绑定交换机和队列,以及 消息能通过 key,发送到指定的 队列中去)
而是生产者首先,将交换机和queue建立一个连接,用channel传过去一个 rounting-key,建立起连接
然后 生产者发消息的时候通过 channel,也传过去一个 rounting-key这样就能找到,queue

消息字段的种类和解释(重点,消息可以设置消息字段的值 让消息带一些属性,但是生产者 发送方 也可以指定JMS 消息字段和值)即 message可以携带 JMS消息字段,producer也可以指定 JMS消息字段(以谁为准,等会考虑),这个也是 JMS规范和,rabbitmq的具体实现和用法,以rabbitmq的为准

JMSDestination:可以是 具体的一个队列,可以是 具体的一个topic
JMSReplyTo: 也是destination,只是这个是,消费者消费完消息,并向 mq 发一条 响应消息的时候,用到的 destination(可以是一个具体的队列,也可以是一个具体的topic)

JMSDeliveryMode(投递模式): 发送的模式,可以是点对点(那 destination就是 队列),可以是发布订阅模式(destination是 topic)

上面两个是相对应的

JMSTimestrap(时间戳):从消息封装完成开始发送 到 成功发送的时间间隔 就是 时间戳
因为,这个时间段,可能网络堵塞等原因,导致,开始发送 到 发送成功 有一段时间间隔

JMSMessageID:消息的ID,生产者发消息,消息都带唯一ID

JMSCorrelationID: 这个也是消息的ID,只是这个是,消费者拿到 ID消息,并且消费者再向mq中指定的 destination 回一条 响应消息的时候,需要ID,那这个ID,就可以来自于 消费的消息的 ID,mq中有了这条消息,通知消费者消费(这时一般 发这个消息的生产者,可以认识 这个ID的消息,并消费)

JMSExpiration: 消息的过期时间,即在发送消息的那一刻(不是说发送成功) 开始计算,超过这个时间,消息过期(特别是超过这个时间,消息都还没有成功发送到mq上,那消费者就完全 消费不了这条消息了)

生产者和消费者应该成绑定状态,即通过 mq容器绑定(生产者 想要 生产的 消息 发给 指定的 消费者消费,那么就发给 消费者绑定的指定队列中) 并且 生产者 和 消费者 如果都作为 消费者状态的话,都得绑定 一个 queue,这样 mq 中的 queue 才知道 给哪个 消费者发 自己的 消息 Java消息服务应用程序结构支持两种模式(这个是 JMS的,不是rabbitmq的,只是说rabbitmq的 工作模式 是以这个为依据的)

1、在点对点或队列模型下(最重要的特点,一条消息只能发给一个消费者,即不是 所有消息 同步到 所有消费者) 一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费 者的队列,并直接将消息发送到消费者的队列,概括为:1.一条消息只有一个消费者获得2.生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行状态3.每一个成功处理的消息要么自动确认,要么由接收者手动确认。1.MQ queue 中的 消息 有两种1.未消费2.未确定2.一条消息只有一个消费者获得即使mq中的消息还没被确定(清除),也不能发给其他 消费者就算重发给消费者,也只能发给 之前的那个消费者3.queue 采用 轮询的方式(负载均衡)发给 绑定这个 queue 的消费者4.手动确认和自动确认(不是消费者 向 mq 中 发 响应消息)1.手动确认 消费者消费完消息,写段代码响应mq, 确认已消费 或确认没收到消息 或 确认没消费手动确认如果没收到,mq可以再 发一次 消息给 你(注意:一个消息 对应 一个消费者,发的都是上一次的同样的消息)可以避免 消息的 丢失2.自动确认queue 只要发送完 消息,就自动确认,然后清除消息(不管你消费者消费与否,收到消息与否)所以自动确认,存在 消息丢失问题

2.发布订阅模式(广播)1、topic 给 消费者 发布消息是 广播的 方式(同样 消费者 也得 绑定 topic) 一个 topic 给 绑定它的 所有消费者 发 所有消息(即每个消费者都可以收到 topic 中的 所有消息) 2.topic中的消息 有确认机制吗(会清除吗)?3.在发布者和订阅者之间存在时间依赖性。1.发布者需要建立一个主题,以便客户能够订阅。2.订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。3.对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。4.为什么 订阅者 要一直上线,否则可能会丢失消息1.点对点 不存在 不上线就丢失消息这一说,因为不是 每个消费者都得 收到所有消息的 点对点 看中的 是 queue 中的 消息 只要能被 消费者 分摊 消费完即可,这个消费者不上线 其他消费者消费了 这些消息 就可以了,消费完 就清除了,也不存重发问题 2.发布订阅模式,看中的 是 每个消费者 都得收到 所有消息,你如果不上线,那消息 广播的时候 你就收不到,等你再次 上线,可能消息已经没有了,或者不再广播了3.问题:1.topic 广播完 消息,消息有确认机制,会被清楚吗?2.如果消息还有,那消费者上线后,为什么 不能 重发给 消费者呢,这样消费者不会因为没有上线而丢失消息了

点对对模式和topic模式的相同和区别

1.最大的区别就是,queue是 将消息 轮询给 消费者,而 topic 是把 all message 广播给 所有消费者 其次就是,topic存在消息丢失问题(消费者没上线,无法广播给 它) 2.还有一点不太确定的:topic 存在 确认机制吗?3.相同点:有很多,消费者都得绑定容器,消费者和生产者都没有关系,

消息传递的方式(消息的类型 持久化 or 非 持久化)

JMS有两种传递消息的方式。1.标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用缓存后再转送的机理投递。2.如果一个JMS服务下线,持久性消息不会丢失,等该服务恢复时再传递。默认的消息传递方式是非 持久性的。使用非持久性消息可能降低内务和需要的存储器,当不需要接收所有消息时使用。

JMS在应用(消费者)集群中的问题

问题1.生产中应用基本上都是以集群部署的。在Queue模式下,消息的消费没有什么问题,因为不同节点 的相同应用会抢占式地消费消息,这样还能分摊负载。 2.如果使用Topic广播模式?对于一个消息,不同节点的相同应用都会收到该消息,进行相应的操 作,这样就重复消费了。。。解决办法1.方案一:选择Queue模式,创建多个一样的Queue,每个应用消费自己的Queue。 弊端:浪费空间,生产者还需要关注下游到底有几个消费者,违反了“解耦”的初衷。2.方案二:选择Topic模式,在业务上做散列,或者通过分布式锁等方式来实现不同节点间的竞争。 弊端:对业务侵入较大,不是优雅的解决方法 3.ActiveMQ通过“虚拟主题”解决了这个问题。 生产中似乎需要结合这两种模式:即不同节点的相同应用间存在竞争,会部分消费(P2P),而不 同的应用都需要消费到全量的消息(Topic)模式。这样就可以避免重复消费。


AMQP协议和JMS规范的区别

amqp类似网络协议,所有mq都得支持这个协议才能完成交互 ,并且所有mq在设计的时候都得考虑这个 amqp协议,说白了,rabbitmq就是这个协议的实现

JMS是 java规范即 接口,只是java 操作 mq的规范

关于 路由key 绑定key 和 destination的关系

destination,是jms规范中的,rabbitmq中可以不用,但是如果用的话,就要用一种普通工作模式 即,不指定 交换机,,采用默认的 交换机,直接使用routingKey向队列发送消息,如果该 routingKey指定的队列存在的话而不用JMS规范中的 destination , 采用 rabbitmq中的,那有几种选择1.可以由 生产者 直接 指定 destination(而不是 message中携带)2.也可以不 指定 destination,直接通过,routing-key,交换机

queue和 交换机的 绑定 是用 绑定key 绑定的,交换机和队列通过 绑定key 会组成 一张 路由表 交换机和 queue 可以是 多对多的关系 一个mq server 有多个 virtual host(包含交换机,queue),我们操作mq ,都是在 virtual host的基础上 操作的,不是mq server的 基础上 与交换机 绑定的 可以是 queue(绑定key绑定),也可以是 另一个 交换机 AMQP中的概念

Publisher:消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息。Consumer:消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个queue中接收消息。Server:一个具体的MQ服务实例,也称为Broker。Virtual host:虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtualhost通常包含多个Exchange、Message QueueExchange:交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中。Routing key:路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通常需要和具体的Exchange类型、 Binding的Routing key结合起来使用。Bindings:指定了Exchange和Queue之间的绑定关系。Exchange根据消息的Routing key和 Binding配置(绑定关系、Binding、Routing key等)来决定把消息分派到哪些具体的queue中。这依赖于Exchange类型。Message Queue:实际存储消息的容器,并把消息传递给最终的Consumer

AMQP 传输层架构

1 简要概述AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。我们假定有一个可靠的面向流的网络传输层(TCP/IP或等价的协议)。在一个单一的socket连接中,可能有多个相互独立的控制线程,称为“channel”。每个数据帧使用通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序列传输。**(这个和其他多线程应用不一样的是,其他是 多个连接 产生 多个线程 而mq是一个连接,包含多个线程)我们使用小的数据类型来构造数据帧,如bit,integer,string以及字段表。数据帧的字段做了轻微的封装,不会让传输变慢或解析困难。根据协议规范机械地生成成数据帧层相对简单。线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是AMQP)。我们假定AMQP会扩展,改进以及随时间的其他变化,并要求wire-level格式支持这些变化。2 数据类型AMQP 使用的数据类型如下:Integers(数值范围1-8的十进制数字):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐Bits(统一为8个字节):用于表示开/关值。Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节Long strings:用于保存二进制数据块。Field tables:包含键值对,字段值一般为字符串,整数等。3 协议协商AMQP客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。协议协商是一个很有用的技术手段,因为它可以让我们断言假设和前置条件。在AMQP中,我们需要协商协议的一些特殊方面:1、 真实的协议和版本。服务器可能在同一个端口支持多个协议。2、 双方的加密参数和认证方式。这是功能层的一部分。3、 数据帧最大大小,通道数量以及其他操作限制。对限制条件的认同可能会导致双方重新分配key的缓存,避免死锁。每个发来的数据帧要么遵守认同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。出色地践行了“要么一切工作正常,要么完全不工作”的RabbitMQ哲学。协商双方认同限制到一个小的值,如下:1、服务端必须告诉客户端它加上了什么限制。2、客户端响应服务器,或许会要求对客户端的连接降低限制。4 数据帧界定TCP/IP是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决:1、每个连接发送单一数据帧。简单但是慢。2、在流中添加帧的边界。简单,但是解析很慢。3、计算数据帧的大小,在每个数据帧头部加上该数据帧大小。这简单,快速,AMQP的选择。

交换机 和 mq的工作模式,和JMS工作模式的区别

1.JMS工作模式就2种1.点对点1.生产者端:交换机 向一个 特定的 队列 发消息2.消费者端:queue 将消息 负载均衡轮询 发给 绑定的 消费者2.发布订阅1.生产者端:消息,经过交换机,发给所有 绑定交换机的 queue(不用 routing-key 绑定) 所有,queue都,绑定同一个 交换机2.消费者端:queue 将 消息 全部发送给 绑定的消费者2.rabbitmq的工作模式有 很多种,都是 依据这 JMS 规范 这2种来的,可能是组合来的

在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。用了哪个交换机,就有一种 rabbitmq的 工作模式 交换机类型的不同,就直接导致了工作模式的不同(一个交换机可以对应多种工作模式),工作模式的不同,就直接导致了,交换机和队列绑定方式的不同以及消息入队列以及消息出队列的方式的不同 交换机是原理,工作模式是在交换机原理下的具体设计思想的具体实现,即交换机确定了,那么 交换机将消息是发给指定队列,还是广播,queue将消息发送给绑定队列的方式是负载均衡还是广播 就确定了,而工作模式在 此基础上 可能就是 是一个队列还是 多个队列,消费者是一个还是多个,以及工作模式实际就是业务的具体考虑,我们关注 交换机的类型就行了

生产者,交换机,队列,消费者 四者之间的 绑定关系(这4者必须绑定,只是绑定的 方式有不同,特别是交换机和队列的绑定方式)

1.生产者绑定交换机(channel打开绑定 对应交换机,或者说向交换机 发消息)(可能有 RK,可能没有,根据交换机的类型不同,工作模式不同来的) 2.交换机绑定队列(可能有BK,可能没有,根据交换机的类型不同,工作模式不同来的) 3.队列绑定消费者 上面三种绑定,跟 channel概念区分开,channel是说我们可以通过这个通道,建立起生产者,交换机,队列,消费者4者的联系,一般说的绑定就是联系,通过 channel,我们写代码,去建立他们的绑定去(BK,RK是特殊情况,特定的交换机类型,才有),mq就是在 绑定的 基础上进行的,任何操作都先 谈绑定为依据考虑 所以我们要考虑的就两点(在绑定的基础上) 1.消息 入 队列的方式(指定队列,还是 所有队列)

消息都得先来交换机(不管是指定的还是 默认的)根据交换机的类型不同,可能将消息 发给 指定的 队列,或者发给所有绑定的队列(交换机绑定队列的方式的不同)

2.消息 出 队列的方式(广播还是分发)

队列绑定消费者,可能队列中的消息 赋值均衡发给绑定的消费者也可能,queue中的消息全部发给,所有消费者这也是交换机的不同,导致工作模式的不同

所有queue必须 和 交换机绑定,可能绑定时没有BK,而且queue 和 交换机绑定,的BK 可以设置多个 发送消息时,可以带 RK,也可不带(根据工作模式来),但是必须指定交换机名字("",就是指定默认交换机),因为消息都是交换机帮发给queue的,只是有的工作模式(交换机类型)不用 RK也没有BK RabbitMQ Exchange类型(声明交换机是否持久化,代表mq重启的时候,此类型交换机还是否存在)

RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。

交换机类型只有四个,但是我们可以创建多个交换机实例,即每个实例都是有你指定的名字和交换机类型组成的,下面说的默认交换机,就是mq自带的名字为""的direct类型的交换机实例 还有一个 默认的交换器实例(也是 direct类型的,只是通常不指定交换器的名字的时候使用,它的名字是""

1.和有名字的 direct交换机的区别1.我们知道,queue和交换机都得绑定的,如果你不手动绑定,默认mq会自动将此queue和 默认交换机实例绑定 绑定Key是 queueName2.发送 消息时,得指定 交换机名字为"", RK 为 queueName

默认交换机的 消息入队列是 指定队列,消息出队列的模式是 负载均衡模式

1.Fanout(没有绑定Key,仅此交换机是没有绑定key,且发送消息的时候,不用带 RK)1.消息入队列:(所有)会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中此交换机没有 routing-key的概念,交换机和队列绑定(不用 绑定key)消息过来,直接发给所有 与 Fanout绑定的queue, 2.消息出队列:(广播)queue中的消息广播给所有订阅该消息的消费者,不是负载均衡的 这是发布订阅模式,注意要与 JMS的发布订阅模式区分开

2.Direct(RK 和 BK 的 匹配规则是 相等)1.消息入队列:(指定队列)消息先到交换机,交换机匹配 routing-key和 Blinding-key 相等,消息入指定队列2.消息出队列:(负载均衡)

3.Topic(类似 direct,只是 RK 和 BK 的匹配规则不一样)topic类型的交换器在direct匹配规则上进行了扩展,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,这里的匹配规则稍微不同,它约定:BindingKey和RoutingKey一样都是由"."分隔的字符串;BindingKey中可以存在两种特殊字符“*”和“#”,用于模糊匹配,其中"*"用于匹配一个单词,"#"用于匹配多个单词(可以是0个)。1.消息入队列:(指定)消息先到交换机,交换机匹配 routing-key和 Blinding-key 按匹配规则来 匹配成功,消息入指定队列2.消息出队列:(负载均衡)

我们知道 direct和topic,发送消息 都得指定 RK,即发消息到指定 queue ,但是我们可以采取,即 所有queue和 direct交换机绑定key 都相同,那么 一条消息,就会 发送给 allqueue,达到 fanout交换机发送消息的消息(不是消费消息),这其实就是 RK 匹配 BK 的取巧方式(叫做 direct的 多重绑定) RabbitMQ数据存储 存储机制(简单来说,quque内存中有消息,有 队列索引,磁盘中有index文件(队列索引存在 磁盘中),和queue_store文件(消息存在磁盘中)),即queue的内存中可能有消息,消息索引,但是内存中的消息和消息索引也可能会继续磁盘化(这要根据cpu的状态消息的多少,动态的调整queue的状态)

RabbitMQ消息有两种类型:1、持久化消息和非持久化消息。2、这两种消息都会被写入磁盘。持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。这会提高一定的性能。非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。

临时队列就是一个非持久化的、排他的、自动删除的队列,并且名字是服务器随机生成的。适用场景,不想指定队列名,并且想 消费者不绑定队列,队列直接删除 RabbitMQ存储层包含两个部分:队列索引和消息存储。 队列索引(这里是磁盘的不是queue内存的):rabbit_queue_index(由多个 .ind文件组成,0.ind,1.ind,按顺序存,每个 .ind有大小,超过大小,就存在下一个)注意:index中也可以存消息,当message较小时,存在index 中可以直接取,不用再去 message_store中去IO,节省时间,这里的index也是磁盘的index,不是说的 queue内存中直接存储message的那种情况,是说的,queue中的 message和index都得存到 磁盘中去,index肯定存到 index文件中,message可能存到 index文件,也可能存在 message_store中去

1.索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。每个队列都有相对应的索引。 即不是存的 message主体消息,而是存的每个 message的 元数据(存储地点、是否已被给消费者接收...) 到时候,内存从磁盘中 读 message的时候,通过 队列索引,按顺序,得到message的元数据,然后去 消息存储中 按顺序 取 message加载到 内存中2.索引使用顺序的段文件来存储,后缀为.idx,文件名从0开始累加,每个段文件中包含固定的segment_entry_count 条记录,默认值是16384 每个index从磁盘中读取消息的时候,至少要在内存中维护一个段文件,所以设置queue_index_embed_msgs_below 值得时候要格外谨慎, 一点点增大也可能会引起内存爆炸式增长

消息存储:rabbit_msg_store(msg_store_persistent/msg_store_transient)一个虚拟主机共享一个 消息存储

1.消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一个2.存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。持久化存 储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失。3.store使用文件来存储,后缀为.rdq,经过store处理的所有消息都会以追加的方式写入到该文件 中,当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新 的消息写入。文件名从0开始进行累加。在进行消息的存储时,RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息

消息的存储和读取

1.消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。 最佳的方式是较小的消息存在index中,而较大的消息存在store中。这个消息大小的界定可以通过queue_index_embed_msgs_below 来配置 默认值为4096B。当一个消息小于设定的大小阈值时,就可以存储在index中,这样性能上可以得到优化。 一个完整的消息大小小于这个值,就放到索引中,否则放到持久化消息文件中2.读取消息时,先根据消息的ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直 接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由store进行处理。 删除消息时,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。 在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记 为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效 数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的 数据大小的比值超过设置的阈值garbage_fraction(默认值0.5)时,才会触发垃圾回收,将这两个文件 合并,执行合并的两个文件一定是逻辑上相邻的两个文件。合并逻辑:1、锁定这两个文件2、先整理前面的文件的有效数据,再整理后面的文件的有效数据3、将后面文件的有效数据写入到前面的文件中4、更新消息在ETS表中的记录5、删除后面文件

队列结构

1.通常队列由rabbit_amqqueue_process和backing_queue这两部分组成, rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消 息、处理消息的确认(包括生产端的/confirm/i和消费端的ack)等。backing_queue是消息存储的具体形 式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。2.如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费 者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递3.rabbit_variable_queue.erl 源码中定义了RabbitMQ队列的4种状态:1、alpha:消息索引和消息内容都存内存,最耗内存,很少消耗CPU2、beta:消息索引存内存,消息内存存磁盘3、gama:消息索引内存和磁盘都有,消息内容存磁盘4、delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作4.消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发送变化5.持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种 gama状态只有持久化消息才会有的状态6.在运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量 (target_ram_count),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息 可能会转换到beta、gama或者delta状态。区分这4种状态的主要作用是满足不同的内存和CPU需求。7.对于普通没有设置优先级和镜像的队列来说,backing_queue的默认实现是rabbit_variable_queue, 其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。


8.消费者获取消息也会引起消息的状态转换。9.当消费者获取消息时1、首先会从Q4中获取消息,如果获取成功则返回。2、如果Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,如果为空则返回队列 为空,即此时队列中无消息。 3、如果Q3不为空,则取出Q3中的消息;进而再判断此时Q3和Delta中的长度,如果都为空,则可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4 中获取消息4、如果Q3为空,Delta不为空,则将Delta的消息转移至Q3中,下次可以直接从Q3中获取消息。 在将消息从Delta转移到Q3的过程中,是按照索引分段读取的,首先读取某一段,然后判断读 取的消息的个数与Delta中消息的个数是否相等,如果相等,则可以判定此时Delta中己无消 息,则直接将Q2和刚读取到的消息一并放入到Q3中,如果不相等,仅将此次读取到的消息转移到Q3。这里就有两处疑问,第一个疑问是:为什么Q3为空则可以认定整个队列为空?1、试想一下,如果Q3为空,Delta不为空,那么在Q3取出最后一条消息的时候,Delta 上的消息就会被转移到Q3这样与 Q3 为空矛盾;2、如果Delta 为空且Q2不为空,则在Q3取出最后一条消息时会将Q2的消息并入到Q3中,这样也与Q3为空矛盾;3、在Q3取出最后一条消息之后,如果Q2、Delta、Q3都为空,且Q1不为空时,则Q1的消息会被转移到Q4,这与Q4为空矛盾其实这一番论述也解释了另一个问题:为什么Q3和Delta都为空时,则可以认为 Q2、Delta、Q3、Q4全部为空通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态对于持久化消息,它一定会进入gamma状态,在开启publisher /confirm/i机制时,只有到了gamma 状态时才会确认该消息己被接收若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态

为什么消息的堆积导致性能下降?

消息一多,消息进磁盘,并且index中已经存不下 message了,只有 存在 message_store中,增加了IO次数应对这一问题一般有3种措施:1、增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。2、采用multiple ack,降低处理 ack 带来的开销3、流量控制

rabbitmq 用户权限 是指,对哪些 虚拟主机可以操作(操作权限有哪些:配置?读?写?) RabbitMQ工作流程详解 生产者发送消息的流程

1、生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel) 2、生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等 1.持久化 是指 mq重启,这个 交换机 还是否存在3、生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等 1.声明队列的时候,若mq的 virtual中 有这个队列,那么声明的属性必须和 mq中的那个队列相同 若mq中的 virtual 中没有,那么创建 一个queue 2.queue的持久化是指,mq重启,此queue还是否存在 3.queue的自动删除是指,queue如果 绑定任何 消费者或 没有任何生产者向它发消息 (前提,消费者 和 queue 之间 是 推送消息,不是 消费者 主动 拉取消息) 那么mq自动删除 此 queue ***4.排他性:暂时不知道 4、生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来注意:交换机的绑定有两种1.channel.exchangeBind 交换机 绑定 交换机2.channel.queueBind 交换机 绑定 队列5、生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息6、相应的交换器根据接收到的 routingKey 查找相匹配的队列。7、如果找到,则将从生产者发送过来的消息存入相应的队列中。8、如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者9、关闭信道。10、关闭连接。

消费者接收消息的过程

1、消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。2、消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作channel.basicGet(queueName,boolean aotoAck) 消费者 主动 向 指定 queue 拉取消息channel.basicConsunme(queue, boolean autoAck,回调成功函数,回调失败函数)queue 给 消费者 推送 消息消息的确认机制 autoAck 是在 这里 指定的3、等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。4、消费者确认( ack) 接收到的消息。5、RabbitMQ 从队列中删除相应己经被确认的消息。6、关闭信道。7、关闭连接。

消费者和生产者,都可以 声明queue(注意,mq中已存在,那声明的queue要和,mq中的一模一样,具体是哪方声明,应业务的不同导致工作模式的不同,而确定(例如topic工作模式,那么消费者声明 queue即可,消费者指定 queue和交换机的 绑定规则)

),exchange,绑定queue和exchange,在真正,basicPublish和basicConsume之前,mq都不知道谁是生产者谁是消费者

虚拟主机如果用 setUri的方式 指定,那么 虚拟主机若是 /,那么在 Uri 中 写成 %2f 特别注意:消费者 channel打开后,消费者执行消费是异步的,这时不能把channle关闭,channel关闭了,消费者就无法消费消息了,可以理解成,channel.basicConsume(“queue.biz”, (s, delivery) -> {System.out.println(new String(delivery.getBody()));}, (s)->{});和 Consume是 异步的,是先 channel.close还是先执行 handler方法,是不确定的,因为是异步的 Connection 和Channel关系

1.生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建 立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是 建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。

2.简单来说,考虑到 一个线程,用一个连接,太浪费资源(tomcat,redis,mysql...都是用的这种) 那么用多个线程共享一个连接即可(NIO),但这时又出现一个问题,一个连接多个线程共享, 那会导致的问题不是浪费连接资源了,是一个连接无法承受多个线程共享了,那么此时就多用几个连接,把线程分摊给 多个连接即可3.Connection 里面有很多 channel,线程通过channle访问 mq, 其实 最后 channel访问时,也是用connection在访问 connection(里面是 java NIO 的 select模型)来去 和 mq交互

mq的源码暂时难以理解 RabbitMQ工作模式详解 Work Queue(交换机是direct)

1.生产消息消息发送给交换机,交换机根据 RK 匹配指定的 BK的 队列2.消费消息多个消费者消费一个队列(负载均衡模式)3.模式:一个队列,多个消费者4.适用场景:生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果

路由模式(direct)

1.生产消息消息发送给交换机,交换机根据 RK 匹配指定的 BK的 队列2.消费消息多个消费者消费一个队列(负载均衡模式)3.模式:多个队列,多个消费者4.适用场景:对不同的消息做不同的处理,即不同消息,被不同队列接收,每个队列代表不同类型消费者消费 例如:将不同日志级别的日志记录交给不同的应用处理。5.和上面 工作模式 没啥区别,就是队列多了点(业务程度的不同,原理没有不同)

主题模式(topic)

1.生产消息消息发送给交换机,交换机根据 RK 匹配指定的 BK的 队列这个指定队列,就是匹配规则和 direct不同2.消费消息多个消费者消费一个队列(负载均衡模式)3.模式:多个队列,多个消费者4.适用场景:对不同的消息做不同的处理,即不同消息,被不同队列接收,每个队列代表不同类型消费者消费 并且区别于 direct 是 想把 queue能接收的消息 范围 扩大,以至于 消费者 可以接收 更宽大范围的消息 例子:不仅想根据日志级别划分日志消息,还想根据日志来源划分日志,怎么做? 即多个消息类型,可以匹配到一个 queue , 让 一个queue可以 接收 不同 RK 的 消息,扩大了消费者能消费的 消息的范围5.BK 的定义规则 1、* (star)匹配一个单词 2、# 匹配0到多个单词

发布订阅模式(fanout)

1.生产消息消息发送到交换机,不用带 RK,交换机将消息发送给,和此交换机绑定的 所有 队列2.消费消息多个消费者消费一个队列(广播模式,所有消费者 拿到 所有 消息)消费者不在线,就无法拿到3.模式:发送也广播,推送也广播4.适用场景:每个消费者都可以消费到完整的消息。

Spring整合RabbitMQ

simpleMessageListenerContainer是一个容器,用到监听器推送消息模式时使用,它的作用是用来管理监听器的
但是我们注入的时候是注入,simpleMessageListenerContainerFactory即可,它会帮我们创建容器
而且这个 factory的可以设置,例如消息消费是否 autoAck , 消费者的最大连接数…
并且 我们注入 带有 @rabbitLinstener方法的 组件 ,并且要 @enableRabbit
@EnableRabbit的作用是,注入 RabbitListenerAnnotationBeanPostProcessor,这样,在创建 有@rabbitLinstner方法的 组件的时候,这个后置处理器会 处理这个bean,并开启监听,这样这个组件就器作用了,不过 用springboot的话,这个后置处理器以及有了,所以就不用 @EnableListener了

我们一般在,rabbitConfig里面去完成上面这一些组件的注册,以及绑定关系,消费者就直接用template去 写消息和 消费 消息即可

用springboot 都不需要写 消费消息的语句了即不用写 rabbitTemplate.receive(queueName)了,
直接准备一个 监听实体类即可,默认就会去消费消息

如果是 推送消息,rabbitConfig里面还要注册 一个 SImpleRabbitListenerContainerFactory去创建 监听器的容器

RabbitAdmin是用来声明 queue , exchange , 并且绑定他们的,Queue , Exchange , Blind只是类,我们要注册,但是真正的 声明,绑定,是RabbitAdmin做的

关于 连接工厂有多种实现类

最常用的就是这个 caching的
连接工厂,创建连接和通道,关闭的时候,都不是真正的关闭,都是退回到连接池(类似连接池技术,避免不断,开关 connection)

对于这个 cachingConnectionFactory 它的特点是 缓存模式是 CHANNEL ,即对通道缓存

在springboot中,只需要注册,queue,exchange,blind,listener(剩下的,rabbitAdmin ,连接工厂,RabbitTemplate , @EnableRabbit , 包括 监听器容器工厂)都不需要注册了
要更改这些没有注册的信息的时候,用配置文件,或者注册一个替代默认的即可

关于 监听方法的 参数列表 RabbitMQ高级特性解析 消息可靠性

1.什么叫消息的可靠性(mq角度)1.简单来说,消息发送到mq必须要保证mq成功接收到消息,消息被消费者消费,必须向mq发送消息确认消息已经消费2.消息队列是为了保证最终一致性,我们需要确保消息队列有ack机制 客户端收到消息并消费处理完成后 客户端发送ack消息给消息中间件 如果消息中间件超过指定时间还没收到ack消息,则定时去重发消息。 还有生产者一端,消息是否成功发送了(得有一个确认机制),如果没有成功发送,要考虑事务回滚,重发 2.可以从以下几方面来保证消息的可靠性:1、客户端代码中的异常捕获,包括生产者和消费者2、AMQP/RabbitMQ的事务机制3、发送端确认机制4、消息持久化机制5、Broker端的高可用集群6、消费者确认机制7、消费端限流8、消息幂等性

保证消息的可靠性的几个方面(可能多个一起使用,目的就是保证消息的正确性可靠性,以及消息不是可靠的即不是我们想的那样,我们怎么处理) 1.异常捕获机制(缺点:发送消息的过程没有捕获到异常,不代表消息发送成功)

先执行行业务操作,业务操作成功后执行行消息发送,消息发送过程通过try catch 方式捕获异常,在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。也就是说,发送消息如果异常,那么重发消息,但是 没有异常 也可能 mq没有接收到消息另外,可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试

2.AMQP/RabbitMQ的事务机制

1.没有捕获到异常并不能代表消息就一定投递成功了。 那么上面那种单纯的 异常捕获就不行,要用这种事务机制 2.一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销 比较大,一般也不推荐使用。

3.发送端确认机制

1.发送消息2.mq 接收消息后,会 回传 确认消息(消息包含deliveryTag 字段包含了确认消息的序号,另外,通过设 置channel.basicAck方法中的multiple参数 true,表示到这个序号deliveryTag 之前的所有消息是否都已经得到了处理了) 通常 multiple = true的适用场景 在mq批量接收到一定数量的消息后,回传确认消息时设置 成 true,delivery Tag代表这个序号的以及之前的消息发送成功 multiple = false的适用场景 在mq每接收到一条消息,回传确认消息时设置 成 false , delivery Tag代表这个序号的消息发送成功 3.回传确认消息 可以是同步的,也可以是异步的1.同步1.单个消息发送,然后回传确认消息,拿到确认消息,结束生产2.批量处理多个消息发送完,然后回传确认消息,这时候 multiple = true , 这时的确认消息 就是多条确认消息,然后拿到确认消息,结束生产2.异步addConfirmListener 方法可以添加ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含两个方法:handleAck 和handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。4.在mq发送 确认消息的时候(basic.ack , basic.nack),如果出现了异常,那么需要捕获 而且这是最重要的,我们原本的目的是解决 消息的可靠性,当 mq接收的消息出现了问题,可能是 各种类型的异常 或 未接收到消息等,发送端消息确认只是,mq给我们客户端的一些 确认接收/未确认接收的一些信息,我们拿着这些信息 要去 针对的考虑 解决办法











spring.rabbitmq.publisher-/confirm/i-type=correlated
spring.rabbitmq.publisher-returns=true
是指,开启发送端消息确认机制,而且 发送端 和 mq 是 correlated 即 相关的,相关性有 消息的Delivery Tag ID保证,mq回传 确认消息的时候,也要回传 指定的 Delivery Tag ID






4.持久化存储机制

1、Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不不丢失2、Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不丢失。3、消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化,保证消息自身不丢失。4.RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不不足时,非持久化的消息也会被刷盘处理) 这些处理理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:1、队列索引(rabbit_queue_index),rabbit_queue_index 负责维护Queue中消息的信息,包括 消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应 的rabbit_queue_index。2、消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列列 共享,在每个节点中有且只有一个。5.当持久化消息,小于4096字节(包含消息体、属性及headers),直接存储在 队列索引中 (rabbit_queue_index)

5.消费端确认机制(Consumer ACK)

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。