import com.example.rabbitmq.utils.rabbitmqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;public class consumer9 { //普通交换机 private final static String NORMAN_EXCHANG_NAME = "norman_exchang1"; //死信交换机 private final static String DEAD_EXCHANG_NAME = "dead_exchang2"; //普通队列 private final static String NORMAN_QUEUE_NAME = "norman_queue1"; //死信队列 private final static String DEAD_QUEUE_NAME = "dead_queue2"; public static void main(String[] args) throws IOException, TimeoutException { //获取Channel对象 Channel channel = rabbitmqUtils.rabbitmqFactory(); //声明交换机 channel.exchangeDeclare(DEAD_EXCHANG_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(NORMAN_EXCHANG_NAME, BuiltinExchangeType.DIRECT); Map
import com.example.rabbitmq.utils.rabbitmqUtils;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.concurrent.TimeoutException;public class producer4 { //普通交换机 private final static String NORMAN_EXCHANG_NAME = "norman_exchang1"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = rabbitmqUtils.rabbitmqFactory(); //过期时间 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build(); //死信消息,设置ttl时间(存活时间) for (int i = 0; i <= 10; i++) { String info = "info"+i; channel.basicPublish(NORMAN_EXCHANG_NAME,"zhangsan",basicProperties ,info.getBytes()); System.out.println("消息发送"+info); } }}
死信队列消费者import com.example.rabbitmq.utils.rabbitmqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;public class consumer10 { //死信队列 private final static String DEAD_QUEUE_NAME = "dead_queue2"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = rabbitmqUtils.rabbitmqFactory(); System.out.println("等待消息接收。。。"); DeliverCallback deliverCallback = (tag, msg)->{ System.out.println("死信的队列接收到的消息:"+new String(msg.getBody())); channel.basicAck(msg.getEnvelope().getDeliveryTag(),false); }; channel.basicConsume(DEAD_QUEUE_NAME, false,deliverCallback, consumerTag->{ System.out.println(consumerTag+"取消了"); }); }}
模拟
首先我们启动consumer9中的main方法, 创建普通交换机、死信交换机、普通队列、死信队列。
第二步是关闭consumer9中的main方法,模拟故障。
第三步启动producer4中的main方法,发送消息,一共发送了11条消息。
过了10秒后,数据到了死信队里中,看下图:
现在我们启动死信队列消费者,就可以看到最后数据都由死信队列消费
SpringBoot整合Rabbitmq 死信队列实例
1.X是普通交换机,Y是死信交换机。QA、QB、QD是队列,不过QA和QB是普通队列,QD是死信队列。XA、XB、YD是RoutingKey,也就是路由的key。
2.那么我们发送消息发送X普通交换机,通过RoutingKey XA指向QA队列,在队列中停留10秒,自动把消息给到死信队列QD。那么同理如果RoutingKey XB指向QB则要停留40秒才把消息抛弃给死信队列。
包这里有个坑,如果整合最后启动不了可以试试换一下版本,那么我这个没有看到version,这是因为parent中已经定义了,在parent中改就行了,如果只是想改rabbitmq的版本,可以在dependency里面加version。
application-test.yml (在application.properties中指定这个配置文件 spring.profiles.active=test)spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
config包下TtlQueueConfig.java,这个主要是定义交换机和队列做绑定关系import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class TtlQueueConfig { //普通交换机 private final static String NORMAN_EXCHANG_NAME_X= "X"; //死信交换机 private final static String DEAD_EXCHANG_NAME_Y= "Y"; //普通队列 private final static String NORMAN_QUEUE_NAME_A = "QA"; private final static String NORMAN_QUEUE_NAME_B = "QB"; //死信队列 private final static String DEAD_QUEUE_NAME = "QD"; @Bean("X") DirectExchange xExchang(){ return new DirectExchange(NORMAN_EXCHANG_NAME_X); } @Bean("Y") DirectExchange yExchang(){ return new DirectExchange(DEAD_EXCHANG_NAME_Y); } @Bean("QA") Queue aQueue(){ Map
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j@RestController@RequestMapping("/ttl")public class sendMessgeController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg{msg}") public void sendMsg(String msg){ Date date = new Date(); log.info("当前时间:{},发送一条消息给两个ttl队列:{}",date ,msg); rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列"+msg); }}
死信队列消费者import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;@Slf4j@Componentpublic class QueueConsumer { @RabbitListener(queues = "QD") void receive(Message message, Channel channel){ String msg = new String(message.getBody()); log.info("当前时间{},消息为{}", new Date(),msg); }}
启动项目的时候查看一下交换机和队列是否生成,如图
没有问题,我们就通过我们sendMessgeController中sendMsg发送发送一条消息:666
相隔十秒,没有问题,如果我们想用40秒过期的那个队列只需要在生产者convertAndSend中第二个参数routingKey改为XB就指向QB队列了。