1、MQ作用2、MQ产品对比3、RocketMQ基本概念4、RocketMQ系统框架
4.1.Producer4.2.Consumer4.3.Name Server4.4.Broker 5、RocketMQ工作流程 1、MQ作用
消息队列是一种“先进先出”的数据结构。
其应用场景主要包含以下3个方面:
应用解耦
系统的耦合性越高,容错性就越低。比如用户创建订单后,如果耦合调用物流系统,如果系统出了故障,都会造成下单操作异常,影响用户使用体验。
比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,取出消息队列中的订单消息即可。
流量削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。
一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就阻止用户请求,这会影响用户体验,如果使用消息队列将请求缓存起来,等待系统处理完毕后告知用户结果,提高了体验。
数据分发
通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。
2、MQ产品对比 3、RocketMQ基本概念
消息(Message):
指消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
主题(Topic):
Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
一个生产者可以同时发送多种Topic的消息;而一个消费者只能消费一种Topic的消息。
标签(Tag):
为消息设置的标签,用于同一主题下区分不同类型的消息,可以根据不同业务类型在同一主题下设置不同标签。
Topic是消息的一级分类,Tag是消息的二级分类。
队列(Queue)
一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。
一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费,不能被消费多次。
消息标识(MessageId/Key)
RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。
msgId: 由producer端生成,其生成规则为:producerIp + 进程pid +hashCode +当前时间 + AutomicInteger自增计数器offsetMsgId: 由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的偏移量)key: 由用户指定的业务相关的唯一标识。 4、RocketMQ系统框架
rocketmq架构上主要分为四部分构成:Producer、Consumer、Name Server、Broker
消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
4.2.Consumer消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。一个Topic类型的消息可以被多个消费者组同时消费,同一个组中只能有一个消费者消费消息。
消费者可以组从消费者组,实现负载均衡和容错
负载均衡: 将一个Topic中的不同的Queue平均分配到消费者组中的不同消费者。容错: 一个消费者宕机以后,该消费者组中的其它消费者可以接着消费原消费者的Queue。消费者的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不会消费消息。 4.3.Name Server
NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。
Broker管理: 接受Broker集群的注册信息,提供心跳检测机制,检查Broker是否还存活。
路由信息管理: 每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。
路由注册
在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。对于Broker,必须明确指出所有NameServer地址,因此NameServer并不能随便扩容。因此,若Broker不重新配置,新增的NameServer对于Broker来说是不可见的,其不会向这个NameServer进行 注册。
Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等
路由剔除
NameServer中有⼀个定时任务,每隔10秒就会扫描⼀次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。
路由发现
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认**客户端(是Producer与Consumer)**每30秒会拉取一次最新的路由。
拓展:rabbitmq 同时存在pull与push模型
客户端连接NameServer策略
客户端(是Producer与Consumer) 在配置时必须要写上NameServer集群的地址,首先采用的是随机策略进行的选择,失败后采用的是轮询策略。
随机策略: 客户端首先会生产一个随机数,然后再与NameServer节点数量取模,此时得到的就是所要连接的节点索引。轮询策略: 如果连接失败,则会采用轮询策略,逐个尝试着去连接其它节点。 4.4.Broker
Broker充当着消息中转角色,负责存储消息、转发消息。Broker在rocketmq系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。
Broker Server的功能模块示意图:
Remoting Module: 整个Broker的实体,负责处理来自clients端的请求。Client Manager: 客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。Store Service: 存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。HA Service: 高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。Index Service: 索引服务。根据特定的消息标识(Message key),对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能。 5、RocketMQ工作流程
1)启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。
2)启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳包。
3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。
4)Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息**(Queue与Broker的地址的映射关系)**。然后选择一个Queue,与Queue所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息。
5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后获取到要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。