欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

消息中间件RabbitMQ(六)——如何保证消息的可靠性

时间:2023-07-09
文章目录

1、消息有效期

1.1 默认情况1.2 关于TTL

1.1.1 单条消息过期示例1.1.2 消息队列过期示例 1.3 死信队列

1.3.1 死信交换机1.3.2 死信队列1.3.3 代码示例 2、消息发送可靠性

2.1 RabbitMQ 消息发送机制2.2 确认消息成功方法

2.2.1 开启事务机制2.2.2 发送方确认机制 2.3 失败重试

2.3.1 自带重试机制2.3.2 消息发送失败重试 3、消息消费可靠性

3.1 消息消费两种模式3.2 消息消费成功方法3.3 消息拒绝3.4 消息确认

3.4.1 自动确认3.4.2 手动确认

3.4.2.1 推模式手动确认3.4.2.2 拉模式手动确认 3.5 幂等性问题

你知道RabbitMQ如何保证消息可靠性吗?这是面试时的一个问题,下面就来了解下RabbitMQ中,如何保证消息的可靠性。先从消息有效期开始吧~

1、消息有效期 1.1 默认情况

在RabbitMQ中,默认情况下,如果在发送消息时,不设置消息过期相关参数,那么消息是永不过期的,即使消息没有被消费掉,消息也会一直存储在队列中

1.2 关于TTL

TTL(Time-To-Live),表示消息存活时间,即消息的有效期

可以通过给消息设置 TTL让消息存活一定的时间,如果消息存活时间超过了 TTL并且还没有被消息,那么消息就会变成死信,那什么是死信呢?带着疑问继续

在RabbitMQ中,有两种方式给消息设置TTL:

方式一:在声明队列时,可以给队列设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期方式二:在发送消息时,设置消息的有效期,这样不同的消息就具有不同的有效期如果两个都设置了,以时间短的为准

当给消息设置有效期后,如果消息过期没有被消费就会被从队列中删除,进入到死信队列,但是这两种方式对应的删除时机有点差异:

方式一:当为消息队列设置过期时间时,消息过期了就会被删除,因为消息进入 RabbitMQ后是存在一个消息队列中,队列的头部是最早要过期的消息,所以 RabbitMQ只需要一个定时任务,从头部开始扫描是否有过期消息,有的话就直接删除方式二:当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会被删除,因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除 1.1.1 单条消息过期示例

单条消息设置过期时间,就是在消息发送的时候设置一下消息有效期即可

配置文件 application.properties :

server.port=8889spring.rabbitmq.host=192.168.43.86spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/

配置类TTLRabbitMQConfig:

@Configurationpublic class TTLRabbitMQConfig { // 交换机的名称 public static final String SCORPIOS_TTL_EXCHANGE_NAME = "scorpios_ttl_exchange_name"; // 发送队列名称 public static final String SCORPIOS_TTL_MSG_QUEUE = "scorpios_ttl_msg_queue"; @Bean DirectExchange directExchange(){ return new DirectExchange(TTLRabbitMQConfig.SCORPIOS_TTL_EXCHANGE_NAME,true,false); } @Bean Queue queue() { return new Queue(TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE,true,false,false); } @Bean Binding bindingMsg(){ return BindingBuilder.bind(queue()).to(directExchange()).with(TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE); }}

在创建队列时,第三个从参数表示排他性,如果设置为 true,则该消息队列只有创建它的 Connection才能访问,其他的 Connection都不能访问该消息队列,如果试图在不同的连接中重新声明或者访问排他性队列,那么系统会报一个资源被锁定的错误。另一方面,对于排他性队列而言,当连接断掉的时候,该消息队列也会自动删除,无论该队列是否被声明为持久性队列都会被删除

消息发送着:

@Slf4j@RestControllerpublic class RabbitMQController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String send() { // 创建消息对象 Message message = MessageBuilder.withBody("message set ttl ...".getBytes(StandardCharsets.UTF_8)) .setExpiration("20000") .build(); rabbitTemplate.convertAndSend(TTLRabbitMQConfig.SCORPIOS_TTL_EXCHANGE_NAME,TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE,message); return "success"; }}

在创建 Message对象时,设置消息过期时间,这里设置消息过期时间为 20 秒

启动项目,在浏览器中输入:http://localhost:8889/send/message

当消息发送成功之后,由于没有消费者,所以这条消息并不会被消费,打开 RabbitMQ Web管理页面,查看 Queues选项卡,20秒之后,会发现消息会被删除

1.1.2 消息队列过期示例

一旦给消息队列设置消息过期时间,所有进入到该队列的消息都有一个相同的过期时间,下面来看看给消息队列设置过期时间

配置文件同上,在创建队列时,给队列设置消息过期时间,如下:

@BeanQueue queue() { Map setting = new HashMap<>(); setting.put("x-message-ttl", 20000); return new Queue(TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE,true,false,false,setting);}

消息发送者:

@Slf4j@RestControllerpublic class RabbitMQController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String send() { // 创建消息对象 Message message = MessageBuilder.withBody("message set ttl ...".getBytes(StandardCharsets.UTF_8)) .build(); rabbitTemplate.convertAndSend(TTLRabbitMQConfig.SCORPIOS_TTL_EXCHANGE_NAME,TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE,message); return "success"; }}

在创建消息时,不要再为消息设置过期时间,启动项目,在浏览器中输入:http://localhost:8889/send/message

打开 RabbitMQ Web管理页面,查看 Queues选项卡,可以看到,消息队列的 Features属性为 D 和 TTL,D 表示消息队列中消息持久化,TTL则表示消息会过期,20秒后刷新RabbitMQ Web页面,发现消息数量已经恢复为 0

如果把消息的过期时间设置为0呢?是什么意思呢?

这表示如果消息不能立马消费则会被立即丢掉

1.3 死信队列

在上文提到,如果消息的过期时间到了,消息就会被删除,那么被删除的消息去哪了?真的被删除了吗?

1.3.1 死信交换机

死信交换机,Dead-Letter-Exchange 即 DLX。死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:

消息被拒绝(Basic.Reject/Basic.Nack) ,并且设置requeue参数为false消息过期队列达到最大长度

当消息在队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX的消息队列则称为死信队列

DLX本质上也是一个普普通通的交换机,可以为任意队列指定 DLX,当该队列中存在死信消息时,RabbitMQ就会自动的将这个死信消息发布到 DLX上去,进而被路由到另一个绑定了 DLX的队列上,即死信队列。

1.3.2 死信队列

绑定了死信交换机的队列就是死信队列

1.3.3 代码示例

配置文件 application.properties :

server.port=8889spring.rabbitmq.host=192.168.43.86spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/

配置类:定义死信交换机和死信队列和普通的交换机、队列没啥区别

@Configurationpublic class DLXRabbitMQConfig { // 交换机的名称 public static final String SCORPIOS_DLX_EXCHANGE = "scorpios_dlx_exchange"; // 发送队列名称 public static final String SCORPIOS_DLX_QUEUE = "scorpios_dlx_queue"; public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue"; @Bean DirectExchange dlxDirectExchange(){ return new DirectExchange(DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE,true,false); } @Bean Queue dlxQueue() { return new Queue(DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE,true,false,false); } // 将死信队列和死信交换机绑定 @Bean Binding dlxBinding(){ return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE); } // 创建一个普通队列,并配置死信交换机 @Bean Queue msgQueue() { Map setting = new HashMap<>(); // 设置死信交换机 setting.put("x-dead-letter-exchange", DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE); // 设置死信 routing_key 与队列名称相同 setting.put("x-dead-letter-routing-key", DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE); return new Queue(DLXRabbitMQConfig.SCORPIOS_MSG_QUEUE, true, false, false, setting); }}

上面需要注意的是,定义一个普通队列,然后配置死信交换机,配置死信交换机有两个参数:

x-dead-letter-exchange:配置死信交换机x-dead-letter-routing-key:配置死信 routing_key

如果发送到这个消息队列上的消息,发生了 Basic.Reject/Basic.Nack 或者过期等问题,就会被发送到 DLX上,进入到与 DLX绑定的消息队列上。

如果为死信队列配置消费者,那么这条消息最终会被死信队列的消费者所消费。死信消息队列消费者:

@Slf4j@Componentpublic class Consumer { @RabbitListener(queues = DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE) public void dlxConsume(String msg) { log.info("死信队列收到的消息为:{}", msg); }}

消息发送者:

@Slf4j@RestControllerpublic class RabbitMQController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String send() { log.info("客户端发送消息"); // 创建消息对象 Message message = MessageBuilder.withBody("message set ttl ...".getBytes(StandardCharsets.UTF_8)) .setExpiration("5000") // 5秒过期 .build(); rabbitTemplate.convertAndSend(DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE,DLXRabbitMQConfig.SCORPIOS_MSG_QUEUE,message); return "success"; }}

启动程序,先查看RabbitMQ Web客户端:

在浏览器中输入:http://localhost:8889/send/message,查看控制台:

在发送消息时,设置了TTL为5秒,5秒后消息没有被消费,则被死信队列的消费者消费

2、消息发送可靠性 2.1 RabbitMQ 消息发送机制

在 RabbitMQ中,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者(Consumer)去消费

从消息发送者的角度考虑消息可靠性,主要从两方面:

消息成功到达 Exchange消息成功到达 Queue

如果这两步都没问题,那么就可以认为消息是发送成功,如果这两步中任何一步出问题,那么消息就没有成功送达

如何知道上面两步是否成功呢?可以引入确认机制,要确保消息成功发送,可以做以下三个步骤:

确认消息到达 Exchange确认消息到达 Queue开启定时任务,定时投递那些发送失败的消息 2.2 确认消息成功方法

上面三个步骤,前两步 RabbitMQ则有现成的解决方案,第三步需要自己实现。确保消息成功到达 RabbitMQ,有两种方法:

开启事务机制发送方确认机制

两种方法,不可以同时开启,只能选择其中之一

2.2.1 开启事务机制

开启RabbitMQ事务机制,需要先提供一个RabbitMQ事务管理器,配置类如下:

@Configurationpublic class TransactionRabbitMQConfig { // 交换机的名称 public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name"; // 发送队列名称 public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue"; @Bean DirectExchange directExchange(){ return new DirectExchange(TransactionRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false); } @Bean Queue queue() { return new Queue(TransactionRabbitMQConfig.SCORPIOS_MSG_QUEUE,true,false,false); } @Bean Binding binding(){ return BindingBuilder.bind(queue()).to(directExchange()).with(TransactionRabbitMQConfig.SCORPIOS_MSG_QUEUE); } // 自定义RabbitMQ事务管理器 @Bean RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); }}

消息生产者需要添加事务注解并设置通信信道为事务模式:

@Servicepublic class SendMessageService { @Autowired private RabbitTemplate rabbitTemplate; // 添加事务注解 @Transactional public void sendMessage(){ // 设置通信信道为事务模式 rabbitTemplate.setChannelTransacted(true); // 创建消息对象 Message message = MessageBuilder.withBody("message transaction ...".getBytes(StandardCharsets.UTF_8)) .build(); rabbitTemplate.convertAndSend(TransactionRabbitMQConfig.SCORPIOS_EXCHANGE_NAME, TransactionRabbitMQConfig.SCORPIOS_MSG_QUEUE,message); // 模拟异常发生 int i = 1/0; }}

下面做两种操作:

在方法的最后,人为制造一个运行时异常 ,运行程序,发现消息并未发送成功把 @Transactional 注解和setChannelTransacted方法删除,运行程序,发现即使发生运行时异常,消息依然能发送到RabbitMQ

开启事务机制的测试结果:抛异常,RabbitMQ Web客户端没有收到消息

未开启事务机制的测试结果:抛异常,RabbitMQ Web客户端收到消息

当RabbitMQ开启事务之后,RabbitMQ生产者发送消息会多出四个步骤:

客户端发出请求,将信道设置为事务模式服务端给出回复,同意将信道设置为事务模式客户端发送消息客户端提交事务服务端给出响应,确认事务提交

上面的步骤,除了第三步是本来就有的,其他几个步骤都是开启事务之后多出来的,这样分析,是不是觉得事务模式效率有点低

下面来看一下另一种方法:消息确认机制(publisher /confirm/i)

2.2.2 发送方确认机制

其实发送方消息确认机制,在之前的文章中有用过,先看下面配置,是不是很眼熟:

server.port=8889spring.rabbitmq.host=192.168.43.86spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/# 消息到达交换器的确认回调spring.rabbitmq.publisher-/confirm/i-type=correlated# 开启消息到达队列的回调spring.rabbitmq.publisher-returns=true

spring.rabbitmq.publisher-/confirm/i-type属性配置有三值:

none:表示禁用发布确认模式,默认correlated:表示成功发布消息到交换器后会触发的回调方法simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用

在配置类中添加回调的监听,配置类如下:

@Slf4j@Configurationpublic class /confirm/iRabbitMQConfig implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback, InitializingBean { @Autowired RabbitTemplate rabbitTemplate; // 交换机的名称 public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name"; // 发送队列名称 public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue"; @Bean DirectExchange directExchange(){ return new DirectExchange(/confirm/iRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false); } @Bean Queue queue() { return new Queue(/confirm/iRabbitMQConfig.SCORPIOS_MSG_QUEUE,true,false,false); } @Bean Binding binding(){ return BindingBuilder.bind(queue()).to(directExchange()).with(/confirm/iRabbitMQConfig.SCORPIOS_MSG_QUEUE); } // 为RabbitTemplate绑定回调 @Override public void afterPropertiesSet() { rabbitTemplate.set/confirm/iCallback(this); rabbitTemplate.setReturnsCallback(this); } // 消息到达交换机时回调 @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("{}:消息成功到达交换机",correlationData.getId()); }else{ log.error("{}:消息发送失败", correlationData.getId()); } } // 消息路由到队列失败时回调 @Override public void returnedMessage(ReturnedMessage returned) { log.info("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId()); }}

下面验证两个回到函数是否会被执行,先把消息发到不存在的交换机上:

@GetMapping("/send/exchange")public String sendExchange() { String uuid = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("TestExchange", /confirm/iRabbitMQConfig.SCORPIOS_MSG_QUEUE,"message confirm callback ...".getBytes(StandardCharsets.UTF_8),new CorrelationData(uuid)); return "success";}

注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台日志:

再把消息发到不存在的队列上:

@GetMapping("/send/queue")public String sendQueue() { String uuid = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend(/confirm/iRabbitMQConfig.SCORPIOS_EXCHANGE_NAME, "TestQueue","message confirm callback ...".getBytes(StandardCharsets.UTF_8),new CorrelationData(uuid)); return "success";}

可以看到,消息虽然成功达到交换器了,但是没有成功路由到队列,因为队列不存在,控制台日志:

如果是消息批量处理,那么发送成功的回调监听是一样的,这就是 publisher-/confirm/i 模式

相比于事务,这种模式下的消息吞吐量会得到极大的提升

2.3 失败重试

失败重试有两种情况,一种是没找到 RabbitMQ导致的失败重试,一种是找到 RabbitMQ但是消息发送失败重试

2.3.1 自带重试机制

对于第一种失败重试,就是发送方连不上RabbitMQ,这种情况很好理解,只要你把RabbitMQ的连接地址写错,启动项目,控制台就会一直报重连日志,这个重试机制和 RabbitMQ本身没有关系,是利用 Spring 中的 retry 机制来完成的,可做如下配置:

# 开启重试机制spring.rabbitmq.template.retry.enabled=true# 重试起始间隔时间spring.rabbitmq.template.retry.initial-interval=1000ms# 最大重试次数spring.rabbitmq.template.retry.max-attempts=10# 最大重试间隔时间spring.rabbitmq.template.retry.max-interval=10000ms# 间隔时间乘数,此处为2,表示第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推spring.rabbitmq.template.retry.multiplier=2

2.3.2 消息发送失败重试

对于第二种失败重试,消息发送失败重试主要是针对消息没有到达交换器的情况

如果消息没有成功到达交换器,可以用RabbitTemplate./confirm/iCallback来触发消息发送失败回调,在这个回调中,可以做很多事情,比如可以把失败消息存入数据库,写个定时任务,不断重试,也可以把所有消息都做记录。

具体场景具体分析吧~

3、消息消费可靠性

上述确保了消息发送的可靠性,但还是要考虑一个问题:消息消费的可靠性

3.1 消息消费两种模式

RabbitMQ 消息消费,有两种方式:

推(push):MQ 主动将消息推送给消费者,此方式需要消费者设置一个缓冲区去缓存消息,对于消费者来说,内存中总是有一堆需要处理的消息,这种方式效率比较高,也是目前大多数应用采用的消费方式拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果消费者需要批量拉取消息,可以采用这种方式

对于MQ推(push)方式,上面的例子都是这一种,也就是通过 @RabbitListener 注解去标记消费者,当监听的队列中有消息时,就会触发该方法:

@Slf4j@Componentpublic class Consumer { @RabbitListener(queues = RabbitConfig.SCORPIOS_MSG_QUEUE) public void consume(String msg) { log.info("收到的消息为:{} ",msg); }}

对于消费者拉(pull)取消息方式,使用RabbitTemplate中的方法:

public void consume() throws UnsupportedEncodingException { // 从指定队列拉取消息 Object message = rabbitTemplate.receiveAndConvert(RabbitConfig.SCORPIOS_MSG_QUEUE); log.info("message: {} " + new String(((byte[]) message),"UTF-8"));}

receiveAndConvert 方法执行完后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息

receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间N秒,如果队列中没有消息了,则 receiveAndConvert 方法会阻塞N秒,N秒内如果队列中有了新消息就返回,N秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0

如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可

3.2 消息消费成功方法

为了确保消息能被消费者成功消费,RabbitMQ 中提供了消息消费确认机制,当消费者去消费消息时,可以通过设置 autoAck 参数来表示消息消费的确认方式

当 autoAck 为 false 时,即使消费者已经收到消息,RabbitMQ 也不会立即移除消息,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除当 autoAck 为 true 时,消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使消息并没有到达消费者

在 RabbitMQ Web 管理页面中:

Ready 表示待消费的消息数量Unacked 表示已经发送给消费者,但是还没收到消费者 ack 的消息数量

当把 autoAck 设置为 false 时,对于 RabbitMQ 来说,消费消息分成了两个部分:

待消费的消息已经投递给消费者,但是还没有被消费者确认的消息

换言之,当设置 autoAck 为 false 时,消费者将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack, RabbitMQ收到确认的ack才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到消费者的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费

确保消息被成功消费有两种方式:手动 Ack 或者自动 Ack,无论哪一种,最终都有可能导致消息被重复消费,所以还需要在处理消息时,解决幂等性问题

3.3 消息拒绝

当消费者接收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息:

@Componentpublic class Consumer { @RabbitListener(queues = RabbitConfig.SCORPIOS_MSG_QUEUE) public void consume(Channel channel, Message message) { // 第一步:获取消息编号 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 第二步:拒绝消息 channel.basicReject(deliveryTag, true); } catch (IOException e) { e.printStackTrace(); } }}

上面代码调用 basicReject 方法来拒绝消费消息,方法第二个参数为布尔值,表示是否将消息重新放入队列,为 true,被拒绝的消息会重新进入到消息队列中,等待下一次被消费;为 false,被拒绝的消息就会被丢掉,不会有新的消费者去消费它,需要注意的是,basicReject 方法一次只能拒绝一条消息

3.4 消息确认

消息确认分为自动确认和手动确认

3.4.1 自动确认

默认情况下,消息消费就是自动确认的,下面的消费方法已经出现过很多次了

@Slf4j@Componentpublic class Consumer{ @RabbitListener(queues = RabbitConfig.SCORPIOS_MSG_QUEUE) public void consume(String msg) { log.info("收到的消息为:{}",msg); int i = 1 / 0; }}

通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了

3.4.2 手动确认

手动确认又可以分为两种:推模式手动确认与拉模式手动确认

3.4.2.1 推模式手动确认

开启手动确认,需要在配置文件中开启:

# 表示将消息的确认模式改为手动确认spring.rabbitmq.listener.simple.acknowledge-mode=manual

消息消费者:

@RabbitListener(queues = RabbitConfig.SCORPIOS_MSG_QUEUE)public void consume(Message message,Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 消费消息 String s = new String(message.getBody()); log.info("收到的消息为:{} ",s); // 消费完成后,手动 ack channel.basicAck(deliveryTag, false); } catch (Exception e) { //手动 nack try { channel.basicNack(deliveryTag, false, true); } catch (IOException ex) { ex.printStackTrace(); } }}

上面的代码进行了异常处理,如果消息正常消费成功,会执行 basicAck 方法完成确认

如果消息消费失败,会进入异常处理中,执行 basicNack 方法,告诉 RabbitMQ 消息消费失败

basicAck():表示手动确认消息已经成功消费,该方法有两个参数:参数一表示消息的 id,参数二 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功basicNack():表示当前消息未被成功消费,该方法有三个参数:前两个参数意义同上,第三个参数 requeue 上面也解释过,被拒绝的消息是否重新入队

当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题

3.4.2.2 拉模式手动确认

拉模式手动 ack 比较麻烦,在Spring中封装的 RabbitTemplate 中并未找到对应的方法,所以需要用原生方法:

public void consume() { Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false); long deliveryTag = 0L; try { GetResponse getResponse = channel.basicGet(RabbitConfig.SCORPIOS_MSG_QUEUE, false); deliveryTag = getResponse.getEnvelope().getDeliveryTag(); log.info("接受到的消息为:{}",new String((getResponse.getBody()), "UTF-8")); channel.basicAck(deliveryTag, false); } catch (IOException e) { try { channel.basicNack(deliveryTag, false, true); } catch (IOException ex) { ex.printStackTrace(); } }}

3.5 幂等性问题

上面说过,确保消息被成功消费有两种方式:手动 Ack 或者自动 Ack,但无论哪一种,最终都有可能导致消息被重复消费,这样就存在解决幂等性问题。

幂等性指的是多次操作,结果是一致的,比如多次操作数据库数据是一致的,幂等性是分布式环境下常见的问题。

消息被重复消费情况:

消费者在消费完一条消息后,向 RabbitMQ发送一个 ack确认,如果此时网络断开或者其他原因导致 RabbitMQ并没有收到这个确认ack,RabbitMQ并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同样,消息在发送的时候,同一条消息也可能会发送两次。

幂等性问题基本上都可以从业务上来处理,常见的解决幂等性的方式有以下:

唯一索引:保证插入的数据只有一条,可以用Redis实现Token机制:每次接口请求前先获取一个token,然后再下次请求的时候在请求的header体中加上这个token,后台进行验证,如果验证通过删除token,下次请求再次判断token悲观锁或者乐观锁:悲观锁可以保证每次for update时其他sql无法update数据(在数据库引擎是innodb的时候,select的条件必须是唯一索引,防止锁全表)【分布锁思路】先查询后判断:首先通过查询数据库是否存在数据,如果存在证明已经请求过了,直接拒绝该请求,如果没有存在,就证明是第一次进来,直接放行

代码地址:https://github.com/Hofanking/springboot-rabbitmq-example

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。