欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

rabbitmq死信队列(过期)

时间:2023-06-15
pom.xml

com.rabbitmq amqp-client 5.4.3

创建普通交换机、死信交换机、普通独立额、死信队列

import com.example.rabbitmq.utils.rabbitmqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;public class consumer9 { //普通交换机 private final static String NORMAN_EXCHANG_NAME = "norman_exchang1"; //死信交换机 private final static String DEAD_EXCHANG_NAME = "dead_exchang2"; //普通队列 private final static String NORMAN_QUEUE_NAME = "norman_queue1"; //死信队列 private final static String DEAD_QUEUE_NAME = "dead_queue2"; public static void main(String[] args) throws IOException, TimeoutException { //获取Channel对象 Channel channel = rabbitmqUtils.rabbitmqFactory(); //声明交换机 channel.exchangeDeclare(DEAD_EXCHANG_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(NORMAN_EXCHANG_NAME, BuiltinExchangeType.DIRECT); Map map =new HashMap<>(); //过期时间 //map.put("x-message-ttl",1000*10); //设置死信交换机 map.put("x-dead-letter-exchange",DEAD_EXCHANG_NAME); //设置正常队列的限制 //map.put("x-max-length",6); //拒绝 //设置死信RoutingKey map.put("x-dead-letter-routing-key","lisi"); //声明死信队列 channel.queueDeclare(DEAD_QUEUE_NAME,false,false,false,null); //声明普通队列 channel.queueDeclare(NORMAN_QUEUE_NAME,false,false,false,map); //普通队列和死信队列进行绑定 channel.queueBind(NORMAN_QUEUE_NAME,NORMAN_EXCHANG_NAME,"zhangsan"); channel.queueBind(DEAD_QUEUE_NAME,DEAD_EXCHANG_NAME,"lisi"); System.out.println("等待接收消息。。。"); DeliverCallback deliverCallback = (v1,v2)->{ System.out.println("普通队列接收的消息内容"+new String(v2.getBody())); }; channel.basicConsume(NORMAN_QUEUE_NAME,true,deliverCallback,tag->{}); }}

创建生产者

import com.example.rabbitmq.utils.rabbitmqUtils;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.concurrent.TimeoutException;public class producer4 { //普通交换机 private final static String NORMAN_EXCHANG_NAME = "norman_exchang1"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = rabbitmqUtils.rabbitmqFactory(); //过期时间 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build(); //死信消息,设置ttl时间(存活时间) for (int i = 0; i <= 10; i++) { String info = "info"+i; channel.basicPublish(NORMAN_EXCHANG_NAME,"zhangsan",basicProperties ,info.getBytes()); System.out.println("消息发送"+info); } }}

死信队列消费者

import com.example.rabbitmq.utils.rabbitmqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;public class consumer10 { //死信队列 private final static String DEAD_QUEUE_NAME = "dead_queue2"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = rabbitmqUtils.rabbitmqFactory(); System.out.println("等待消息接收。。。"); DeliverCallback deliverCallback = (tag, msg)->{ System.out.println("死信的队列接收到的消息:"+new String(msg.getBody())); channel.basicAck(msg.getEnvelope().getDeliveryTag(),false); }; channel.basicConsume(DEAD_QUEUE_NAME, false,deliverCallback, consumerTag->{ System.out.println(consumerTag+"取消了"); }); }}

模拟

首先我们启动consumer9中的main方法, 创建普通交换机、死信交换机、普通队列、死信队列。

第二步是关闭consumer9中的main方法,模拟故障。

第三步启动producer4中的main方法,发送消息,一共发送了11条消息。

过了10秒后,数据到了死信队里中,看下图:

现在我们启动死信队列消费者,就可以看到最后数据都由死信队列消费
SpringBoot整合Rabbitmq 死信队列实例


1.X是普通交换机,Y是死信交换机。QA、QB、QD是队列,不过QA和QB是普通队列,QD是死信队列。XA、XB、YD是RoutingKey,也就是路由的key。
2.那么我们发送消息发送X普通交换机,通过RoutingKey XA指向QA队列,在队列中停留10秒,自动把消息给到死信队列QD。那么同理如果RoutingKey XB指向QB则要停留40秒才把消息抛弃给死信队列。

废话少说,开始整活 创建springboot项目,这个我就不写了,不了解自己百度去 pom.xml

org.springframework.boot spring-boot-starter-amqp

包这里有个坑,如果整合最后启动不了可以试试换一下版本,那么我这个没有看到version,这是因为parent中已经定义了,在parent中改就行了,如果只是想改rabbitmq的版本,可以在dependency里面加version。

application-test.yml (在application.properties中指定这个配置文件 spring.profiles.active=test)

spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest

config包下TtlQueueConfig.java,这个主要是定义交换机和队列做绑定关系

import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class TtlQueueConfig { //普通交换机 private final static String NORMAN_EXCHANG_NAME_X= "X"; //死信交换机 private final static String DEAD_EXCHANG_NAME_Y= "Y"; //普通队列 private final static String NORMAN_QUEUE_NAME_A = "QA"; private final static String NORMAN_QUEUE_NAME_B = "QB"; //死信队列 private final static String DEAD_QUEUE_NAME = "QD"; @Bean("X") DirectExchange xExchang(){ return new DirectExchange(NORMAN_EXCHANG_NAME_X); } @Bean("Y") DirectExchange yExchang(){ return new DirectExchange(DEAD_EXCHANG_NAME_Y); } @Bean("QA") Queue aQueue(){ Map map = new HashMap(); //死信交换机 map.put("x-dead-letter-exchange",DEAD_EXCHANG_NAME_Y); //RoutingKey map.put("x-dead-letter-routing-key","YD"); //过期时间ttl map.put("x-message-ttl",10000); return QueueBuilder.durable(NORMAN_QUEUE_NAME_A).withArguments(map).build(); } @Bean("QB") Queue bQueue(){ Map map = new HashMap(); //死信交换机 map.put("x-dead-letter-exchange",DEAD_EXCHANG_NAME_Y); //RoutingKey map.put("x-dead-letter-routing-key","YD"); //过期时间ttl map.put("x-message-ttl",40000); return QueueBuilder.durable(NORMAN_QUEUE_NAME_B).withArguments(map).build(); } @Bean("QD") Queue dead_queue(){ return QueueBuilder.durable(DEAD_QUEUE_NAME).build(); } @Bean Binding queueABinding(@Qualifier("QA")Queue qa,@Qualifier("X")DirectExchange X){ return BindingBuilder.bind(qa).to(X).with("XA"); } @Bean Binding queueBBinding(@Qualifier("QB")Queue qb,@Qualifier("X")DirectExchange X){ return BindingBuilder.bind(qb).to(X).with("XB"); } @Bean Binding queueDBinding(@Qualifier("QD")Queue qd,@Qualifier("Y")DirectExchange Y){ return BindingBuilder.bind(qd).to(Y).with("YD"); }}

controller包中sendMessgeController.java,消息生成者。

import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j@RestController@RequestMapping("/ttl")public class sendMessgeController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg{msg}") public void sendMsg(String msg){ Date date = new Date(); log.info("当前时间:{},发送一条消息给两个ttl队列:{}",date ,msg); rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列"+msg); }}

死信队列消费者

import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;@Slf4j@Componentpublic class QueueConsumer { @RabbitListener(queues = "QD") void receive(Message message, Channel channel){ String msg = new String(message.getBody()); log.info("当前时间{},消息为{}", new Date(),msg); }}

启动项目的时候查看一下交换机和队列是否生成,如图


没有问题,我们就通过我们sendMessgeController中sendMsg发送发送一条消息:666

相隔十秒,没有问题,如果我们想用40秒过期的那个队列只需要在生产者convertAndSend中第二个参数routingKey改为XB就指向QB队列了。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。