RabbitMQ消息确定主要分为两部分,消息发送确定和消息接收确定(ACK)。
消息发送路径 消息路径 producter -> rabbitmq broker -> exchange -> queue -> consumer消息从生产者到Broker,则会触发/confirm/iCallBack回调消息从exchange到Queue,投递失败则会调用returnCallBack 消息发送确定 消息发送确定1
消息从生产者到 Broker有一个 /confirm/iCallback确认模式,当消息被Broker接收,无论成功或失败都会触发,/confirm/iCallback回调。
配置开启/confirm/iCallback
publisher-/confirm/i-type: 表示确认消息的类型,分别有none、correlated、simple这三种类型。publisher-/confirm/i-type: none:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发/confirm/iCallback回调。publisher-/confirm/i-type: correlated:表示消息成功到达Broker后触发/confirm/iCalllBack回调publisher-/confirm/i-type: simple:simple模式下如果消息成功到达Broker后一样会触发/confirm/iCalllBack回调,发布消息成功后使用rabbitTemplate调用waitFor/confirm/is或waitFor/confirm/isOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,注意:waitFor/confirm/isOrDie方法如果返回false则会关闭channel信道,则接下来无法发送消息到broker。
@Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("发送消息到broker成功"); }else{ log.error("发送消息到broker失败" + cause); } }
消息发送确定2消息从交换机到 队列 投递失败有一个ReturnCallback回退模式,可以通过重写returnedMessage方法来做消息重发等补偿操作。
publisher-returns: true ,true表示开启失败回调,开启后当消息无法路由到指定队列时会触发ReturnCallback回调。
@Override public void returnedMessage(ReturnedMessage returned) {log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message); }
消息接收确定(ACK)消息确定三种模式
手动确认 listener.simple.acknowledge-mode: manual,在该模式下,消费者消费消息后需要根据消费情况给Broker返回一个回执,是确认ack使Broker删除该条已消费的消息,还是失败确认返回nack,还是拒绝该消息。开启手动确认后,如果消费者接收到消息后还没有返回ack就宕机了,这种情况下消息也不会丢失,只有RabbitMQ接收到返回ack后,消息才会从队列中被删除。该模式下有三种确认方式,后面会介绍。
自动确认 listener.simple.acknowledge-mode: none,rabbitmq默认消费者正确处理所有请求。(不设置时的默认方式)
根据请况确认 listener.simple.acknowledge-mode: auto,主要分成以下几种情况:
如果消费者在消费的过程中没有抛出异常,则自动确认。当消费者消费的过程中抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且该消息不会重回队列。当抛出ImmediateAcknowledgeAmqpException异常,消息会被确认。如果抛出其他的异常,则消息会被拒绝,但是与前两个不同的是,该消息会重回队列,如果此时只有一个消费者监听该队列,那么该消息重回队列后又会推送给该消费者,会造成死循环的情况。
消费端手动确定
basicAck(long deliveryTag, boolean multiple) basicAck方法表示成功确认,使用此方法后,消息会被rabbitmq broker删除,其中参数long deliveryTag为消息的唯一序号,boolean multiple表示是否一次消费多条消息,false表示只确认该序列号对应的消息,true则表示确认该序列号对应的消息以及比该序列号小的所有消息,比如我先发送2条消息,他们的序列号分别为2,3,并且他们都没有被确认,还留在队列中,那么如果当前消息序列号为4,那么当multiple为true,则序列号为2、3的消息也会被一同确认。
basicNack(long deliveryTag, boolean multiple, boolean requeue) basicNack方法表示失败确认,一般当我们消费消息时出现异常用到此方法,可以通过参数requeue设置是否将消息重新投递到队列。requeue表示消息是否重回队列,如果requeue=false表示消息不重回队列并且丢弃该消息,如果为requeue=true则消息重回队列。
basicReject(long deliveryTag, boolean requeue)basicReject方法表示拒绝消息,requeue=false表示被拒绝的消息会被丢弃,requeue=true表示消息会重回队列,该方法与basicReject方法的区别就是不支持multiple批量确认。