pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
application.yml:
spring: rabbitmq: addresses: 192.168.1.9:5672 username: admin password: admin
User类(消息负载的实体类):
package com.kaven.springboot.rabbitmq;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.Setter;import lombok.ToString;@Setter@Getter@ToString@AllArgsConstructorpublic class User { private String username; private String password; private String code;}
Json2UserMessageConverter类(消息转换器,将json数据转换成User对象,json数据由消息体的byte[]生成):
package com.kaven.springboot.rabbitmq;import com.google.gson.Gson;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.support.converter.MessageConversionException;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Component("json2UserMessageConverter")public class Json2UserMessageConverter implements MessageConverter { private static final Gson GSON = new Gson(); @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(GSON.toJson(object).getBytes(StandardCharsets.UTF_8)); } @Override public Object fromMessage(Message message) throws MessageConversionException { return GSON.fromJson(new String(message.getBody(), StandardCharsets.UTF_8), User.class); }}
Consumer类(消息监听,使用@RabbitListener注解简化消息监听):
package com.kaven.springboot.rabbitmq;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.stereotype.Component;@Componentpublic class Consumer { @RabbitListener( bindings = { @QueueBinding( value = @Queue("queue.user"), exchange = @Exchange(value = "exchange.user", type = ExchangeTypes.TOPIC), key = {"*.user"} ) }, messageConverter = "json2UserMessageConverter" ) public void process(User user) { System.out.println("Consumer - process 接收消息: " + user); }}
Producer类(用于发布消息):
package com.kaven.springboot.rabbitmq;import com.google.gson.Gson;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.nio.charset.StandardCharsets;import java.util.UUID;@Componentpublic class Producer { private static final Gson GSON = new Gson(); @Resource private final RabbitTemplate rabbitTemplate; public Producer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMsg(User user) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("10000"); Message message = new Message(GSON.toJson(user).getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.send("exchange.user", "new.user", message, correlationId); }}
ProducerController类(用于发布消息的接口):
package com.kaven.springboot.rabbitmq;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestControllerpublic class ProducerController { @Resource private Producer producer; @GetMapping("/send") public String send(User user) { producer.sendMsg(user); return "数据发送成功"; }}
启动类:
package com.kaven.springboot;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class SpringbootApplication { public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringbootApplication.class); application.run(args); }}
启动应用,使用Postman请求接口。
控制台输出:
Consumer - process 接收消息: User(username=kaven, password=itkaven, code=908899)
效果符合预期,使用@RabbitListener注解简化了消息监听,不需要自己定义交换机、队列以及绑定关系等bean,将这些需要的bean全部交给Spring Boot来管理。博客就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。