消息丢失的情况
(1)生产者方面:生产者发送消息至MQ的数据丢失
(2)RabbitMQ方面:MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失
(3)消费者方面:消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完
解决方法
1.配置文件中添加
#消息已发送到交换机(Exchange)时返回spring.rabbitmq.publisher-/confirm/i-type=correlated# 消息在未被队列收到的情况下返回spring.rabbitmq.template.mandatory=truespring.rabbitmq.publisher-returns=true # 开启消息手动确认机制spring.rabbitmq.listener.simple.acknowledge-mode=manual
2.config类配置
package com.example.demo.config;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configurationpublic class RabbitmqConfig { @Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //发送到exchange时调用回调函数 rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() { @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { System.out.println("/confirm/iCallback:"+"相关数据:"+correlationData+" /confirm/iCallback:"+"确认情况:"+ack+" /confirm/iCallback:"+"原因:"+cause); } }); //设置消息抵达队列的失败回调 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("二位热热我热太热"); } }); return rabbitTemplate; } @Bean public void createNormalExchange(){ DirectExchange mydirect = new DirectExchange("mydirect3", true, false); amqpAdmin.declareExchange(mydirect); }}
3.监听类改写
package com.example.demo.service.impl;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.io.IOException;@Servicepublic class RabbitmqTest2Impl { @RabbitListener(queues = {"mybe"}) public void getMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { byte[] body = message.getBody(); MessageProperties messageProperties = message.getMessageProperties(); Thread.sleep(2000); String s = new String(body); System.out.println("消费消息完成"+s); channel.basicAck(deliveryTag,false); } catch (IOException e) { //消息消费方错误后的处理 //deliveryTag消息id //multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag. // requeue是否重新入队 channel.basicNack(deliveryTag,false,false); e.printStackTrace(); } }}