死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
一般来说,生产者将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待 条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。
死信消息来源:
消息 TTL 过期
队列满了,无法再次添加数据
消息被拒绝(reject 或 nack),并且 requeue =false
代码编写:
编写可根据上篇博客来
交换机的使用
1,生产者创建队列和交换机
package com.lgs.scz.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration@SuppressWarnings("all")public class DeadConfig { @Bean public Queue normalQueue(){ Map
config=new HashMap<>(); //过期时间 config.put("x-message-ttl", 10000); //死信交换机 config.put("x-dead-letter-exchange", "deadExchange"); //死信routing key config.put("x-dead-letter-routing-key", "DD"); return new Queue("normalQueue",true,false,false,config); } @Bean public Queue deadQueue(){ return new Queue("deadQueue",true); } @Bean public DirectExchange normalExchange() { return new DirectExchange("normalExchange"); } @Bean public DirectExchange deadExchange() { return new DirectExchange("deadExchange"); } @Bean public Binding normalBinding() { return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("CC"); } @Bean public Binding deadBinding() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("DD"); }} 2,生产者发送信息
package com.lgs.scz.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@SuppressWarnings("all")@Slf4jpublic class ProviderController { @Autowired private RabbitTemplate template; @RequestMapping("/deadSend") public String deadSend(){ log.warn("订单已经保存"); template.convertAndSend("normalExchange","CC","order-1902"); return "yes"; }}
3,消费者接收信息
package com.lgs.xfz.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@SuppressWarnings("all")@RabbitListener(queues = "deadQueue")@Slf4jpublic class DeadReceiver { @RabbitHandler public void process(String message){ log.warn(message+":该订单已经过期"); }}
OK!到这就结束了,希望能帮到你!!!