欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ消息可靠性(一)

时间:2023-06-18
一、消息发送可靠性(推荐使用消息确认模式,事务方式效率低) 1.1 通过事务确保消息发送成功

@Configurationpublic class RabbitmqConfig { @Bean DirectExchange directExchange(){ return new DirectExchange("direct_change"); } @Bean Queue queue1(){ return new Queue("queue",true,false,false); } @Bean Binding binding(){ return BindingBuilder.bind(queue1()).to(directExchange()).with("a"); } @Bean RabbitTransactionManager rabbitTransactionManager(ConnectionFactory factory){ return new RabbitTransactionManager(factory); } @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; }}

@Autowired RabbitTemplate rabbitTemplate; @Transactional(rollbackFor = Exception.class) public void sendMsg(){ rabbitTemplate.convertAndSend("direct_change","a","hello queue"); }

1.2 消息确认机制

spring: rabbitmq: virtual-host: / host: 127.0.0.1 port: 5672 username: guest password: guest publisher-/confirm/i-type: correlated publisher-returns: true

@Configurationpublic class RabbitConfig implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnsCallback { public static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.set/confirm/iCallback(this::/confirm/i); rabbitTemplate.setReturnsCallback(this::returnedMessage); } @Bean DirectExchange directExchange(){ return new DirectExchange("exchange_name",true,false); } @Bean Queue queue1(){ return new Queue("queue_name",true,false,false); } @Bean Binding binding(){ return BindingBuilder.bind(queue1()).to(directExchange()).with("a"); } @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { if(ack){ logger.info("{}消息成功到达交换机",correlationData.getId()); }else { logger.info("{}消息未到达交换机,{}",correlationData.getId(),cause); } } @Override public void returnedMessage(ReturnedMessage returned) { logger.info("消息未到达队列"); }}

二、消息失败重试 2.1 通过配置rabbit实现消息失败重试

spring: rabbitmq: template: retry: ## 启用重试 enabled: true ## 重试时间间隔 initial-interval: 1000ms ## 最大重试次数 max-attempts: 5 ## 重试时间累加 如第一次 1000ms 第二次就是1200 以此类推 multiplier: 1.2 ## 最大重试间隔 max-interval: 5000ms

2.2 业务重试机制

针对消息没有成功到交换机,针对这种情况,回调做相关的业务逻辑即可.
具体思路:
可以使用数据库在每次发送消息的时候写入库里给此条消息增加一个状态,在交换机回调处,做相应的处理逻辑.
1.收到失败回调对此消息做数据库的修改,是重试几次后,还是发送失败那么就不在重新发送此条消息,改变此条消息的状态即可。
2.如果消息收到此条消息发送成功就改变次条消息的状态。
3.当然了,发送失败的消息,重试次数超过后,定时器去找失败的消息,做业务处理即可。
注意:这里会有消息重复发送的情况,在消费方做好消息的幂等性即可。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。