欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ:@RabbitListener注解简化消息监听

时间:2023-05-13

pom.xml:

<?xml version="1.0" encoding="UTF-8"?> 4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.2 jar com.kaven springboot 0.0.1-SNAPSHOT springboot springboot 1.8 org.springframework.boot spring-boot-starter-web org.projectlombok lombok org.springframework.boot spring-boot-starter-amqp com.google.code.gson gson org.springframework.boot spring-boot-maven-plugin

application.yml:

spring: rabbitmq: addresses: 192.168.1.9:5672 username: admin password: admin

User类(消息负载的实体类):

package com.kaven.springboot.rabbitmq;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.Setter;import lombok.ToString;@Setter@Getter@ToString@AllArgsConstructorpublic class User { private String username; private String password; private String code;}

Json2UserMessageConverter类(消息转换器,将json数据转换成User对象,json数据由消息体的byte[]生成):

package com.kaven.springboot.rabbitmq;import com.google.gson.Gson;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.support.converter.MessageConversionException;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Component("json2UserMessageConverter")public class Json2UserMessageConverter implements MessageConverter { private static final Gson GSON = new Gson(); @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(GSON.toJson(object).getBytes(StandardCharsets.UTF_8)); } @Override public Object fromMessage(Message message) throws MessageConversionException { return GSON.fromJson(new String(message.getBody(), StandardCharsets.UTF_8), User.class); }}

Consumer类(消息监听,使用@RabbitListener注解简化消息监听):

package com.kaven.springboot.rabbitmq;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.stereotype.Component;@Componentpublic class Consumer { @RabbitListener( bindings = { @QueueBinding( value = @Queue("queue.user"), exchange = @Exchange(value = "exchange.user", type = ExchangeTypes.TOPIC), key = {"*.user"} ) }, messageConverter = "json2UserMessageConverter" ) public void process(User user) { System.out.println("Consumer - process 接收消息: " + user); }}

Producer类(用于发布消息):

package com.kaven.springboot.rabbitmq;import com.google.gson.Gson;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.nio.charset.StandardCharsets;import java.util.UUID;@Componentpublic class Producer { private static final Gson GSON = new Gson(); @Resource private final RabbitTemplate rabbitTemplate; public Producer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMsg(User user) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("10000"); Message message = new Message(GSON.toJson(user).getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.send("exchange.user", "new.user", message, correlationId); }}

ProducerController类(用于发布消息的接口):

package com.kaven.springboot.rabbitmq;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestControllerpublic class ProducerController { @Resource private Producer producer; @GetMapping("/send") public String send(User user) { producer.sendMsg(user); return "数据发送成功"; }}

启动类:

package com.kaven.springboot;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class SpringbootApplication { public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringbootApplication.class); application.run(args); }}

启动应用,使用Postman请求接口。

控制台输出:

Consumer - process 接收消息: User(username=kaven, password=itkaven, code=908899)

效果符合预期,使用@RabbitListener注解简化了消息监听,不需要自己定义交换机、队列以及绑定关系等bean,将这些需要的bean全部交给Spring Boot来管理。博客就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。