大多应用中,可以通过消息服务中间件来提升系统异步通信、拓展解耦能力。
消息服务中两个重要概念:
消息代理(message broker)目的地(destination)当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地。
消息队列主要有两种形式的目的地
队列(queue):点对点消息通信(point-to-point)主题(topic):发布(publish)/订阅(subscribe)消息通信点对点:
消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列消息只有唯一的发送者和接收者,但并不是说只能有一个接收者
发布订阅模式
发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
JMS(Java Message Service)Java消息服务:
基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
AMQP(Advanced Message Queuing Protocol)
高级消息队列协议,也是一个消息代理的规范,兼容JMSRabbitMQ是AMQP的实现
Spring 支持
spring-jms 提供了对JMS的支持spring-rabbit 提供了对AMQP的支持需要ConnectionFactory的实现来连接消息代理提供Jms Template、Rabbit Template来发送消息@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息@EnableJms、@EnableRabbit开启支持
SpringBoot自动配置
JmsAutoConfigurationRabbitAutoConfiguratoin
市面的MQ产品
ActiveMQ、RabbitMQ、RocketMQ、Kafka JMS(Java Message Service) AMQP(Advanced Message Queuing Protocol) 定义Java api网络线级协议跨语言否是跨平台否是支持消息类型1、Peer-2-Peer
2、Pub/Sub1、direct exchange
2.fanout exchange
3、topic change
4.headers exchange
5.system exchange
后四者本质上与JMS中的Pub/Sub模型没有太大差别,仅是在路由机制上做了更详细的划分。支持消息类型TextMessage
MapMessage
BytesMessage
StreamMessage
ObjectMessage
Message(只有消息头和属性)byte[]
(在实际应用中,如果有复杂的消息,可以将消息序列化后发送)综合评价JMS定义了JAVA API层面的标准,在java系统中,多个Client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差AMQP定义了 wire-level层的协议标准,天然具有跨平台、跨语言的特性RabbitMQ概念
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue Protocol)的开源实现。
核心概念 Message消息(Message)是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对其它消息的优先权)、delivery-moda(指出该消息可能需要持久性存储)等。
Publisher消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别
Queue消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列中。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系
Connection网络连接,比如一个TCP连接
Channel信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接。AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这鞋动作都是通过信道来完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,复用一条TCP连接。
Consumer消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/
Broker表示消息队列服务器实体
Docker安装RabbitMQdocker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369,25672(Erlang发现&集群端口)
5672,5671(AMQP端口)
15672(微博管理后台端口,账号密码默认都是:guest)
61613,61614(STOMP协议端口)
1883,8883(MQTT协议端口)
Exchange 类型Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型 direct 、 fanout 、 topic 、 headers 。headers 匹配 AMQP 消息的 header 而不是路由键。header 交换机和 direct 交换机完全一致,但性能差很多 DireactExchange
如果消息中的路由键(routing-key)于 Binding 中的 binding key 一致,交换器就将消息发送到对应的队列中。路由键需要与队列名(binding key)完全一致。如果一个队列绑定到交换机的binding key 为 “dog”,那么只转发routing key为“dog”的消息。它是完全匹配、单播、点对点模式。
FanoutExchange每个发到 fanout 类型转换机的消息都会被分发到所有绑定的队列中。fanout 不处理路由键,只是简单的将队列绑定到交换机上。每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。fanout 类型转发消息是最快的。
TopicExchangetopic 交换机通过路由键属性和binding key模式匹配的方式进行分配消息到对应的队列中。将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。他将路由键和绑定键的字符串切分成单词,这些单词之间需要用 、隔开,它也是别这两个通配符,分别是 # 和 * 符号
# 表示匹配0个或多个单词
* 表示匹配一个单词
测试Exchange测试之前先创建四个队列,分别是 gulimall 、 gulimall.emps 、 gulimall.news 、 gulixueyuan.news
点击 Queue选项
设置队列信息保存即可
DireactExchange创建一个Direct交换机
点击Exchange选项
创建 direct交换机
点击刚刚创建的交换机
新增绑定关系
绑定刚刚创建的所有队列
测试选择交换机进行测试
发送测试消息
可以看到我们指定的队列接到两条消息了
单机这个队列,然后获取消息
Ack Mode 消息处理方式
Nack message requeue true 接收消息但不做确认,消息会重新加入队列reject requeue false 拒绝消息,消息会被删除reject requeue false 拒绝消息,消息会重新加入队列Automatic ack 获取消息,应答确认,消息不重新入队,将会从队列中删除 FanoutExchange
创建一个 fanout 类型的交换机
进入交换机绑定队列
绑定刚刚创建的所有队列
测试当前绑定的队列消息都是空的
测试发送消息
可以发现我们所有的队列都收到了这条消息
先让任意一个队列获取这个消息并回复确认看看
再次查看队列状态,发现只有刚刚处理掉消息的队列删除了这个消息,其它队列还在等待处理。
TopicExchange创建Topic交换机
创建成功后就给他绑定队列
使用 gulimall.# 表示路由键以 gulimall 单词开头的都可以匹配到
根据以下规则绑定队列
测试发送一个路由键是 gulimall.news 的消息
全部队列都收到了,因为gulimall.# 表示只要是以gulimall.开始的任意字符都能匹配,*.news 表示news前面必须要有一个任意单词
再发一个 hello.news ,绑定键能匹配上的只有 gulixueyuan.news 这个队列
SpringBoot 整合RabbitMQ第一步肯定是要引入依赖
引入这个依赖后,在 RabbitAutoConfiguration 中,配置了如下几个重要的Bean:
RabbitConnectionFactoryBean 连接工厂RabbitTemplate 操作组件AmqpAdmin 高级消息队列管理
引入后需要配置最基础的配置:
spring.rabbitmq.host=192.168.135.128 # 缺省,默认是localhostspring.rabbitmq.port=5672 # 缺省,默认是5671、5672spring.rabbitmq.virtual-host=/ # 缺省,默认/
如需进一步配置,可到 org.springframework.boot.autoconfigure.amqp.RabbitProperties 类中查看,这是 Rabbit 配置类
如何创建 Exchange、Queue、Binding通过 AmqpAdmin 高级消息队列管理组件来进行创建 Exchange、Queue、Binding。
amqpAdmin.declareExchange(Exchange);
declareExchange 方法需要传一个 exchange
根据上图,如果我们需要创建 TopicExchange ,我们需要创建一个 TopicExchange.class 实例给到 declareExchange 方法
实例参数分别是:交换机名称、Durable是否持久化、是否自动删除、交换机参数
TopicExchange topicExchange = new TopicExchange( "hello.java.TopicExchange", true, false, new HashMap());amqpAdmin.declareExchange(topicExchange);
同样的,可以使用 declareQueue 方法创建一个队列
Queue queue = new Queue( "hello.java.queue", true, false, false, new HashMap<>());amqpAdmin.declareQueue(queue);
队列创建参数分别是:队列名称、是否持久化、是否排他(只要有一个连接连上队列,其它连接就不能再连接)、是否是自动删除、队列参数
交换机和队列创建完毕之后,如需创建绑定关系,可以使用 declareBinding 方法创建
Binding binding = new Binding( "hello.java.queue", Binding.DestinationType.QUEUE, "hello.java.TopicExchange", "hello.java.queue", new HashMap<>());amqpAdmin.declareBinding(binding);
Binding 类有如下参数:
destination 目的地(交换机/队列名称)destinationType 绑定类型(和什么东西进行绑定)
EXCHANGE 指明目的地是一个交换机QUEUE 指明目的地是一个队列 exchange 交换机名称routing-key 路由键argument 绑定参数 c
代码参数可参考GUI操作内容:
绑定结果
发送消息一切准备就绪后,通过 RabbitTemplate 来发送、接收消息
Message 需要传递一个字节流来进行初始化实例
send 可以将指定Message 实例发送到指定交换机
String halo = "hello world";Message message = new Message(halo.getBytes());rabbitTemplate.send("hello.java.TopicExchange", "hello.java.queue", message);
同时也可以使用 convertAndSend 方法将一个对象序列化为byte发送。
rabbitTemplate.convertAndSend( "hello.java.TopicExchange", "hello.java.queue", new User());
可以配置一个 MessageConverter 将对象转为JSON格式或其它格式,前提是被序列化对象一定要实现 Serializeble 接口。
@Configurationpublic class RabbitConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }}
监听消息若想接收 Rabbit 的消息,SpringBoot 封装了一个@RabbitListener 注解,可以方便我们接收消息,它可以标注在类和方法上。若需要使用这些注解,必须要开启 Rabbit 才能使用
@EnableRabbit...public class GulimallOrderApplication { public static void main(String[] args) { SpringApplication.run(GulimallOrderApplication.class, args); }}
使用 @RabbitListener 监听指定队列,注意:该注解只在 Spring 容器上生效
@RabbitListener(queues = "hello.java.queue")public void receiveMessage(Message message, User user) { MessageProperties messageProperties = message.getMessageProperties(); log.info("消息头属性信息:{}", messageProperties); log.info("接收到消息:{}", user);// 接收到消息:User(name=Clover You)}
如果需要获取通道,可以使用 com.rabbitmq.client.Channel 参数,例如:
@RabbitListener(queues = "hello.java.queue")public void receiveMessage(Channel channel);
RabbitMQ 消息确认机制Queue 可以有很多人都来监听,只要收到洗消息,队列会将消息删除,而且只能有一个人收到从消息。只有一个消息处理完成方法结束后才能再接收下一个消息
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制。publisher confirmCallback 确认模式publisher returnCallback 未投递到 queue 退回模式consumer ack 机制 可靠抵达 ConfirmCallBack
设置 spring-rabbitmq.publisher-confirm=true
在创建 connectionFactory 的时候设置 PublisherConfirms(true)选项开启/confirm/iCallback。CorrelationData 用来表示当前消息唯一性。消息只要倍 broker 接收到就会执行 confitmCallback,如果是 cluster(集群) 模式,需要所有 broker 接收到才会调用 /confirm/iCallback被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到 retrunCallback。
spring-rabbitmq.publisher-confirm=true 配置在新版中被弃用,改用 spring-rabbitmq.publisher-/confirm/i-type=correlated
@Slf4j@Configurationpublic class RabbitConfig { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void initRabbitTemplate() { rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() { @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { log.info("initRabbitTemplate"); } }); }}
ReturnCallback
设置 spring.rabbitmq.publisher-returns=true
设置 spring.rabbitmq.template.mandatory=true
如果消息抵达目标队列,那么首先通过异步的方式调用retrunsCallba ck
confrim 模式只能保证消息抵达 broker,不能保证消息准确投递到目标 queue 中。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。如果未能投递到目标 queue 里那么将调用 returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
@Slf4j@Configurationpublic class RabbitConfig { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void initRabbitTemplate() { rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { Message message = returned.getMessage();// 投递失败的消息 String replyText = returned.getReplyText();// RabbitMQ回复信息 String routingKey = returned.getRoutingKey();// 投递时使用的路由键 int replyCode = returned.getReplyCode();// RabbitMQ 回复的状态码 String exchange = returned.getExchange();// 当时消息是发给哪个交换机的 log.info("message: {}", message); log.info("replyText: {}", replyText); log.info("routingKey: {}", routingKey); log.info("replyCode: {}", replyCode); log.info("exchange: {}", exchange); } }); }}
==================== Returned Callback ====================message: (Body:'{"name":"Clover You"}' MessageProperties [headers={__TypeId__=top.ctong.gulimall.order.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])replyText: NO_ROUTEroutingKey: hello.java.queue1replyCode: 312exchange: hello.java.TopicExchange
消费端确认消费端确认机制可以保每个消息都能被正确的处理,此时 broker 才可以删除这个消息。
默认是自动确认,只要消息接收到,客户端会自动回复确认,服务端接收到确认消息后就会自动删除这个消息。如果需要设置为手动确认,配置需要添加 spring.rabbitmq.listener.simple.acknowledge-mode=manual手动确认模式,只要我们没有明确告诉MQ消息已被处理/没有Ack,消息就一直是 Unacked 状态。即使 Consumer宕机,消息也不会丢失,会重新变为 Ready 状态。发送4个消息
通过断点阻塞处理方法,所以上图是 Ready=0 ,Unacked=4
停掉所有断点后检查Queue,状态变为未处理:Ready=4 ,Unacked=0
如果需要回复确认,需要使用 Channel 来进行回复
@RabbitListener(queues = "hello.java.queue")public void receiveMessage(Message message, User user, Channel channel) { MessageProperties messageProperties = message.getMessageProperties(); log.info("消息头属性信息:{}", messageProperties); log.info("接收到消息:{}", user); try { // 第二个参数表示不批量确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); }}
同样的,有确认模式也有拒绝模式
@RabbitListener(queues = "hello.java.queue")public void receiveMessage(Message message, User user, Channel channel) { ... try {// 拒绝消息,第三个参数是表示:true 拒绝后消息重新回到队列,false 拒绝后丢弃消息不重新加入MQ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (IOException e) { e.printStackTrace(); }}
Ack消息确认机制消费者获取到消息,成功处理,可以回复 Ack 给 Broker
basic.ack 用于肯定确认;broker 将移除此消息basic.nack 用于否定确认;可以指定 broker 是否丢弃此消息,可以批量basic.reject 用于是否确认,同上,但不能批量 默认:消息被消费者收到,就会从 broker 的 queue 中移除消费者收到消息,默认会自动 ack。但是如果无法确定此消息是否被处理完成,或者成功处理,我们可以开启手动 abc 模式
消息处理成功,ack(),接受下一个消息,此消息 broker 就会移除消息处理失败,nack()/reject() ,将重新发送给其他人进行处理,或者容错处理后 ack消息一直没有调用 ack/nack 方法,broker 认为此消息正在被处理,不会投递给其他人。如果此时客户端断开,消息不会被 broker 移除,会投递给别人。
欢迎访问我的博客:https://www.ctong.top
博文地址:https://www.ctong.top/2022/02/25/RabitMQ%E5%9F%BA%E7%A1%80-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/