1.使用事务消息
通过对信道的设置实现:
//通知服务器开启事务模式,服务端会返回Tx.Select-Okchannel.txSelect();//发布消息channel.basicPublish();channel.txCommit();channel.txRollback();
消费者使用事务:
//手动提交ack,以事务提交或回滚为准autoAck=false;//不支持事务autoAck=true;
如果其中任一环节出现问题,就会抛出IOException异常。用户可以拦截异常进行事务回滚,或决定是否重复消息!事务消息会降低RabbitMQ的性能!!!
2.使用消息确认机制
发送方确认:
channel设置为/confirm/i模式,每条消息会被分配一个唯一id。消息投递成功,信道会发送ack给生产者,包含了id,回调/confirm/iCallback接口。如果发生错误导致消息丢失,发送nack给生产者,回调ReturnCallback接口。
ack和nack只有一个被触发,且只有一次,属于异步触发,可以继续发送消息。
@Component@Slf4jpublic class MyCallBack implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.set/confirm/iCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到id为:{}的消息", id); } else { log.info("交换机还未收到id为:{}的消息,由于原因:{}", id, cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息:{},被交换机:{}退回,退回原因:{},路由key:{}", new String(message.getBody()), exchange, replyText, routingKey); }
接收方确认:
声明队列时,指定autoAck=false,broker会等待消费者手动返回ack,才会删除消息,否则自动删除!broker的ack没有超时机制,只会判断链接是否断开,如果断开,消息会重新发送。
//消费成功后,是否要自动应答channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
RabbitMQ如何确保消息的发送和接收?发布确认模式。
二、项目实现流程保证消息的可靠性传输:
1.发送消息时,将当前消息数据存入数据库,设置投递状态为消息投递中!
2.开启消息确认回调机制,确认成功,更新投递状态为消息投递成功!
3.开启定时任务,重新投递失败的消息,重试超过3次,更新投递状态为投递失败!
幂等性保证:
消息被消费后,将msgId存入redis!当消息再次被消费时,先判断redis中是否已经存在msgId。若存在,则说明消息已被消费过,将不会被允许消费;若不存在,则说明消息从未被消费,会允许被消费。