欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Springboot使用RabbitMq延迟队列和死信队列

时间:2023-05-11
文章目录

前言一、业务解决方案

1.quartz定时器2.redis定时器3mq消息队列 二、RabbitMq延迟队列

1.延迟队列2.死信交换机

3监听器 总结


前言

在最近的项目中,结合minio文件服务器的一些特性。需要做一个分片上传的功能:用户上传文件到md5的桶下,合并文件后删除这个临时桶。会出现这样一种情况,用户上传文件传到一半就不再上传了,那么如何去删除,什么时候去删除时需要解决问题。


一、业务解决方案 1.quartz定时器

如果是单体项目,可以考虑使用quartz定时器。在创建桶的时候加入到定时任务里。

2.redis定时器

redis定时器需要修改配置文件,并且对redis进行监听,在创建桶时,设置过时时间,一旦时间超时,可以对key进行捕捉,最好对名字进行规范设计以便于业务

3mq消息队列

使用延迟队列和死信队列进行定时任务

这篇主要讲解mq的方式解决问题

二、RabbitMq延迟队列 1.延迟队列 延迟队列也是一个普通的队列,和普通的队列相比,他多了几个属性,比如:

1)延迟的时间:表示队列中消息的生命周期,在指定时间后,要么抛弃这个消息,要么投递到死信队列中
2)指定死信交换机:如果不希望丢弃这个消息,那么可以将这个过期的消息丢到死信队列中

定义一个延迟队列:

//桶延迟队列 @Bean(BUCKET_TTL_QUEUE) public Queue bucketTtlQueue(){ Map deadParamsMap = new HashMap<>(); // 设置死信队列的Exchange deadParamsMap.put("x-dead-letter-exchange",BUCKET_DEAD_EXCHANGE); //设置死信队列的RouteKey deadParamsMap.put("x-dead-letter-routing-key",BUCKET_DEAD_QUEUE); // 设置对接过期时间"x-message-ttl" deadParamsMap.put("x-message-ttl",60000*5);//5分钟 // 设置对接可以存储的最大消息数量 //deadParamsMap.put("x-max-length",10); return new Queue(BUCKET_TTL_QUEUE,true,false,false,deadParamsMap); }

延迟队列交换机

如上所说,延迟队列本就是一个普通的队列,如果你想更细粒的对他进行控制,那么需要绑定交换机,如果不绑定交换机,会绑定到默认交换机,在发送消息时,交换机写""就行,默认交换机为直连交换机;

我这里指定了延迟队列的交换机,因为没有做消息幂等性,所以采用直连交换机应对在集群下消息只被消费一次

//桶延迟交换机 @Bean(BUCKET_TTL_EXCHANGE) public DirectExchange bucketTtlExchange() { return new DirectExchange(BUCKET_TTL_EXCHANGE,true,false); } // 绑定 @Bean public Binding bucketTtlBinding() { return BindingBuilder.bind(bucketTtlQueue()) .to(bucketTtlExchange()) .with(BUCKET_TTL_QUEUE); }

2.死信交换机

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列

死信交换机也是普通交换机,他只是你指定接收过期消息的交换机而已

@Bean(BUCKET_DEAD_QUEUE) public Queue bucketDeadQueue() { //属性参数 队列名称 是否持久化 return new Queue(BUCKET_DEAD_QUEUE, true); } @Bean(BUCKET_DEAD_EXCHANGE) public DirectExchange bucketDeadExchange() { return new DirectExchange(BUCKET_DEAD_EXCHANGE,true,false); } @Bean public Binding bucketDeadBinding() { return BindingBuilder.bind(bucketDeadQueue()).to(bucketDeadExchange()).with(BUCKET_DEAD_QUEUE); }

3监听器

消息处理的逻辑,在消息过期后,送到死信交换机里,监听器监听到死信交换机的消息进行删除桶以及文件的业务逻辑处理

@Configurationpublic class BucketDeadConsumer { @Autowired private CachingConnectionFactory cachingConnectionFactory; @Autowired private MinioTemplate minioTemplate; @Bean public SimpleMessageListenerContainer BucketDeadListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); // 监听队列名 container.setQueueNames(MyMqConfig.BUCKET_DEAD_QUEUE); // 当前消费者数量 开启几个线程去处理数据 支持运行时动态修改 container.setConcurrentConsumers(5); // 最大消费者数量 , 消息堵塞太多的时候,会帮我自动扩展到我的最大消费者数量 container.setMaxConcurrentConsumers(10); // 是否重回队列 container.setDefaultRequeueRejected(true); // 手动确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置监听器 container.setMessageListener(new ChannelAwareMessageListener(){ @Override public void onMessage(Message message, Channel channel) throws IOException { // 消息的唯一性ID deliveryTag:该消息的index 自增长 long deliveryTag = message.getMessageProperties().getDeliveryTag(); byte[] messageBody = message.getBody(); String s = new String(messageBody); System.out.println("消息: " + s); System.out.println("消息来自: "+message.getMessageProperties().getConsumerQueue()); System.out.println("交换机: "+message.getMessageProperties().getReceivedExchange()); //删除桶 try { minioTemplate.removeBuckets(s, ""); channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); channel.basicReject(deliveryTag, false); } } }); return container; }}

注:我设置了消息发送和确认的回调函数,为什么没有触发这个函数?有知道了老铁能说说吗?因为我是从后台管理页面发的消息,没有通过rabbitteplate进行发送,不会是这个原因吧!


总结

业务的解决方法有太多种了,找到一个高可用以及简便的方法才是解决问题的关键

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。