@Configurationpublic class RabbitmqConfig { @Bean DirectExchange directExchange(){ return new DirectExchange("direct_change"); } @Bean Queue queue1(){ return new Queue("queue",true,false,false); } @Bean Binding binding(){ return BindingBuilder.bind(queue1()).to(directExchange()).with("a"); } @Bean RabbitTransactionManager rabbitTransactionManager(ConnectionFactory factory){ return new RabbitTransactionManager(factory); } @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; }}
@Autowired RabbitTemplate rabbitTemplate; @Transactional(rollbackFor = Exception.class) public void sendMsg(){ rabbitTemplate.convertAndSend("direct_change","a","hello queue"); }
1.2 消息确认机制spring: rabbitmq: virtual-host: / host: 127.0.0.1 port: 5672 username: guest password: guest publisher-/confirm/i-type: correlated publisher-returns: true
@Configurationpublic class RabbitConfig implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnsCallback { public static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.set/confirm/iCallback(this::/confirm/i); rabbitTemplate.setReturnsCallback(this::returnedMessage); } @Bean DirectExchange directExchange(){ return new DirectExchange("exchange_name",true,false); } @Bean Queue queue1(){ return new Queue("queue_name",true,false,false); } @Bean Binding binding(){ return BindingBuilder.bind(queue1()).to(directExchange()).with("a"); } @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { if(ack){ logger.info("{}消息成功到达交换机",correlationData.getId()); }else { logger.info("{}消息未到达交换机,{}",correlationData.getId(),cause); } } @Override public void returnedMessage(ReturnedMessage returned) { logger.info("消息未到达队列"); }}
二、消息失败重试 2.1 通过配置rabbit实现消息失败重试spring: rabbitmq: template: retry: ## 启用重试 enabled: true ## 重试时间间隔 initial-interval: 1000ms ## 最大重试次数 max-attempts: 5 ## 重试时间累加 如第一次 1000ms 第二次就是1200 以此类推 multiplier: 1.2 ## 最大重试间隔 max-interval: 5000ms
2.2 业务重试机制 针对消息没有成功到交换机,针对这种情况,回调做相关的业务逻辑即可.
具体思路:
可以使用数据库在每次发送消息的时候写入库里给此条消息增加一个状态,在交换机回调处,做相应的处理逻辑.
1.收到失败回调对此消息做数据库的修改,是重试几次后,还是发送失败那么就不在重新发送此条消息,改变此条消息的状态即可。
2.如果消息收到此条消息发送成功就改变次条消息的状态。
3.当然了,发送失败的消息,重试次数超过后,定时器去找失败的消息,做业务处理即可。
注意:这里会有消息重复发送的情况,在消费方做好消息的幂等性即可。