欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ死信交换机&延迟队列

时间:2023-04-21
一,死信队列(延迟队列)

死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。

一般来说,生产者将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待 条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。

死信消息来源:

消息 TTL 过期

队列满了,无法再次添加数据

消息被拒绝(reject 或 nack),并且 requeue =false

代码编写:

编写可根据上篇博客来

交换机的使用

1,生产者创建队列和交换机

package com.lgs.scz.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration@SuppressWarnings("all")public class DeadConfig { @Bean public Queue normalQueue(){ Map config=new HashMap<>(); //过期时间 config.put("x-message-ttl", 10000); //死信交换机 config.put("x-dead-letter-exchange", "deadExchange"); //死信routing key config.put("x-dead-letter-routing-key", "DD"); return new Queue("normalQueue",true,false,false,config); } @Bean public Queue deadQueue(){ return new Queue("deadQueue",true); } @Bean public DirectExchange normalExchange() { return new DirectExchange("normalExchange"); } @Bean public DirectExchange deadExchange() { return new DirectExchange("deadExchange"); } @Bean public Binding normalBinding() { return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("CC"); } @Bean public Binding deadBinding() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("DD"); }}

2,生产者发送信息

package com.lgs.scz.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@SuppressWarnings("all")@Slf4jpublic class ProviderController { @Autowired private RabbitTemplate template; @RequestMapping("/deadSend") public String deadSend(){ log.warn("订单已经保存"); template.convertAndSend("normalExchange","CC","order-1902"); return "yes"; }}

3,消费者接收信息

package com.lgs.xfz.mq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@SuppressWarnings("all")@RabbitListener(queues = "deadQueue")@Slf4jpublic class DeadReceiver { @RabbitHandler public void process(String message){ log.warn(message+":该订单已经过期"); }}

OK!到这就结束了,希望能帮到你!!!

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。