一、RabbitMQ运作原理 二、/confirm/i确认模式介绍及实现消息可靠性投递
在使用RabbitMQ时,消息生产者发送消息时会出现消息丢失或者投递失败的现象;
RabbitMQ在消息投递可靠性方面提供了两种模式:
1.Confirm 确认模式 producter->exchange
2.Return 退回模式 exchange->queue
producer -> exchange
消息确认,生产者消息投递后,如果exchange收到消息,则会给生产者一个答应。
生产者接收应答,以确定消息成功发送到exchange。
spring: rabbitmq: publisher-/confirm/i-type: correlated
2.实现RabbitTemplate./confirm/iCallback接口,实现/confirm/i方法/confirm/i-type有none、correlated、simple这三种类型
none:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发 /confirm/iCallback回调。correlated:表示消息成功到达Broker后触发/confirm/iCalllBack回调simple:simple模式下如果消息成功到达Broker后一样会触发
@Configurationpublic class RabbitConfig { @Autowired private CachingConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(converter()); // 消息是否成功发送到Exchange rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息成功发送到Exchange"); } else { //通过correlationData携带的id可定位哪条消息发送失败,做补发操作 log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause); } }); }}
3.生产者发送消息@Servicepublic class Producter { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; public void sendMessage(Dto Dto) throws JsonProcessingException { mailDto.setMsgId(RandomUtil.randomUUID()); String msgJson = objectMapper.writevalueAsString(Dto); Message message = MessageBuilder .withBody(msgJson.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化 .build(); CorrelationData correlationData = new CorrelationData(mailDto.getMsgId()); rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME,message,correlationData); }}
二、Return退回模式介绍和实现 exchange->queue
如果消息未能路由到目标队列则将触发回调 ReturnCallback
rabbitmq: publisher-/confirm/is: true publisher-returns: true
2.实现returnedMessage方法// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调rabbitTemplate.setMandatory(true);// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);});
三、消费端ACK手动确认机制1.修改配置文件,开启ACK手动确认消息确认三种模式:manual、none、auto
手动确认manual:消费消息后需要根据消费情况返回一个回执,成功手动调用channel.basicAck()手动签收,失败则调用channel.basicNack()方法拒收,让MQ重新发送该消息自动确认 none:默认消费者正确处理所有请求。(不设置时默认方式)根据情况确认 auto:不太常用
spring: rabbitmq: publisher-/confirm/i-type: correlated listener: simple: acknowledge-mode: manual
没有确认,消息为Unack状态 2.消费者接收消息并手动确认消息@Override@RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE})public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { try{ System.out.println("业务处理流程,成功则ACK"); channel.basicAck(tag,true); }catch (Exception e){ System.out.println("业务处理流程,失败则NACK"); channel.basicNack(tag,false,true); }}