欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ消息应答实战(针对自动|手动应答常见问题进行模拟)

时间:2023-06-18
消息应答概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。

1.自动应答:RabbitMQ 只要将消息分发给消费者就被认为消息传递成功,就会将内存中的消息删除,而不管消费者有没有处理完消息。
2.手动应答、RabbitMQ 将消息分发给了消费者,并且只有当消费者处理完成了整个消息之后并且手动发送ack,才会被认为消息传递成功了,然后才会将内存中的消息删除。

问题:
自动应答会在服务器宕机后丢失所有的消息。

模拟自动应答:

自动应答消费者:

public class ConsumerAutoAck { public static final String QUEUE_NAME = "test_auto_ack"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();//消费者接受此消息的回调函数 DeliverCallback deliverCallback = (consumerTag, message) -> { try { SleepUtils.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("低性能服务器接受:" + new String(message.getBody())); }; //消费者未接受此消息的回调函数 CancelCallback cancelCallback = consumerTag -> {}; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); }}

生产者:

public class Producer { public static final String QUEUE_NAME = "test_auto_ack"; public static final String EXCHANGE_NAME = "test_auto_ack"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"auto.ack"); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String message = scanner.next(); System.out.println("发送消息为:" + message); channel.basicPublish(EXCHANGE_NAME,"auto.ack",null,message.getBytes(StandardCharsets.UTF_8)); } }}

当启动生产者后,发送分别发送消息1,2,3,4,观察rabbitmq 队列变化,再启动消费者观察队列变化。

1.发送4条消息

2.观察队列变化

Ready
Number of messages that are available to be delivered now.
队列中现在可以发送的消息(队列中的消息,在队列中排队,还未发送给消费者)
Unacknowledged
Number of messages for which the server is waiting for acknowledgement.
等待服务器发送确认消息队列数(即发送给了消费者,但是还没有收到确认的消息数)
Total
The total of these two numbers.
图中线条说明


test_auto_ack队列中有4条消息等待处理。

3.启动自动应答消费者
4.观察队列变化

4条消息瞬间被分发到自动应答消费者,同时消费者全部自动应答消息。
5.停止消费者

我使用睡眠来模拟低性能的服务器,从输出结果可以得出结论:
使用自动应答的话,如果服务器在处理消息途中突然宕机,那么这些正在传输消息都会凭空丢失,因为自动应答是在mq发送消息后,立马ack,但是我们低性能服务器还在处理之前的消息,这些消息还没有处理,这个是很不合理的。

模拟手动应答

1.手动应答是否批量应答
单个应答:

1.生产者发送4条消息

2.启动消费者后

3.等待消费者处理消息


消费者是睡眠30s后,进行确实消息发送ack,所以图中消费者确认(绿色的线)为30s间隔,进行一次。随着时间,消息不断被消费者一条条确认。

4.停止消费者后


从图中观察,有一条消息还没来得及处理(消费者已经接受了这条消息,但是还没有发送ack),但是这条消息又重新入队列,状态变为在队列中等待状态,当启动消费者后,这条消息会被redeliver给consumer。

整体流程分析:

当生产者publish message给队列,如果队列没有被消费者订阅,那么message会在队列中以ready形式存在,随时等待发送,如果这时启动了消费者,那么消息会全部给消费者,同时完成消息后发送ack,这样即使在处理某一消息时,该消费者宕机,那么这个消息未ack,那么这个消息会重新redeliver给其他的消费者。如果是单个手动应答消息的话,能保证每一个消息都完成,如果为批量手动应答,那么这个时候在消费者堆积的所有消息(信道中传输的消息)都会丢失,因为当消费者接受消息时,完成一个消息即应答所有进入消费者的消息。

此次模拟成功,手动应答消息未丢失。

2.批量应答

public class Consumer01 { public static final String QUEUE_NAME = "test_auto_ack"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,message) -> { try { SleepUtils.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("低性能服务器接受:" + new String(message.getBody())); //第二个参数为是否批量应答,设置为true channel.basicAck(message.getEnvelope().getDeliveryTag(),true); }; CancelCallback cancelCallback = consumerTag -> {}; channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); }}

1.发送4条消息
2.启动消费者后,等待第一条消息处理完后,立马关闭消费者。
3.观察队列变化

结论:

按照我的理解是,当消费者处理完第一条消息后,其余三条消息均发送ack,但是事实情况并不是这样,关闭消费者后,其他三条均为unacked,所以关闭消费者后,这三条消息并未丢失。
这个问题我在用中文搜索不到,清一色尚硅谷文档。。。 待后续解决 //TODO

// 消息tag,是否批量应答channel.basicAck(deliverTag,false);// 消息tag,是否批量应答,是否重新入队列channel.basicNack(deliverTag,false,true); // 不发送ack// 消息tag,是否重新入队列channel.basicReject(deliverTag,true); // 拒绝消息// 不发送ack和拒绝消息好像没有本质上的区别,均可重新入队列且可能又被当前消费者再次消费。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。