1.安装rabbitmq2.SpringBoot整合Rabbitmq案例
2.1 Direct模式(点对点)2.2 Fanout模式(点对面)2.3 Tpoic模式(点对面)2.4 Headers模式(很少用到) 1.安装rabbitmq
可参考这篇文章链接: https://www.jianshu.com/p/14ffe0f3db94.
2.SpringBoot整合Rabbitmq案例 2.1 Direct模式(点对点) 1.docker上启动rabbitmq实例
2.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
3.application.properties
# rabbitmq的配置# 主机spring.rabbitmq.host=192.168.244.135# 端口spring.rabbitmq.port=5672# 用户名spring.rabbitmq.username=guest# 密码spring.rabbitmq.password=guest
4.Direct模式配置文件
package com.yl.amqp.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DirectConfig { //定义队列 @Bean Queue directQueue() { return new Queue("yl-queue",true); } //定义Direct交换机 @Bean DirectExchange directExchange() { return new DirectExchange("yl-direct",true,false); } //将队列绑定到交换机上 @Bean Binding directBiding() { return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct"); }}
5.监听消息队列
package com.yl.amqp.receicer;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class DirectReceiver { // 监听接收消息 @RabbitListener(queues = "yl-queue") public void handler(String msg) { System.out.println(msg); }}
6.测试
1.Fanout模式配置
package com.yl.amqp.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FanoutConfig { @Bean Queue queueOne() { return new Queue("queue-one",true); } @Bean Queue queueTwo() { return new Queue("queue-two",true); } // 定义fanout交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("yl-fanout",true,false); } // 将队列绑定到交换机 @Bean Binding bindingOne() { return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } // 将队列绑定到交换机 @Bean Binding bindingTwo() { return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); }}
2.监听消息
package com.yl.amqp.receicer;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class FanoutReceiver { @RabbitListener(queues = "queue-one") public void handler1(String msg) { System.out.println("handle1()"+msg); } @RabbitListener(queues = "queue-two") public void handle2(String msg) { System.out.println("handle2()"+msg); }}
3.测试
1.Topic模式配置
package com.yl.amqp.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class TopicConfig { @Bean Queue huawei() { return new Queue("huawei",true); } @Bean Queue apple() { return new Queue("apple",true); } @Bean Queue phone() { return new Queue("phone",true); } @Bean TopicExchange topicExchange() { return new TopicExchange("yl-topic",true,false); } @Bean Binding huaweiBinding() { return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#"); } @Bean Binding appleBinding() { return BindingBuilder.bind(apple()).to(topicExchange()).with("apple.#"); } @Bean Binding phoneBinding() { return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#"); }}
2.监听消息
package com.yl.amqp.receicer;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class TopicReceiver { @RabbitListener(queues = "huawei") public void handle1(String msg) { System.out.println("handle1()" + msg); } @RabbitListener(queues = "apple") public void handle2(String msg) { System.out.println("handle2()" + msg); } @RabbitListener(queues = "phone") public void handle3(String msg) { System.out.println("handle3()" + msg); }}
3.测试1
4.测试2
1.Headers模式的配置
package com.yl.amqp.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.HeadersExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class HeaderConfig { @Bean Queue ageQueue() { return new Queue("queue-age",true); } @Bean Queue nameQueue() { return new Queue("queue-name",true); } @Bean HeadersExchange headersExchange() { return new HeadersExchange("yl-headers",true,false); } @Bean Binding ageBinding() { Map
2.监听消息
package com.yl.amqp.receicer;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class HeadersReceiver { @RabbitListener(queues = "queue-age") public void handle1(String msg) { System.out.println("handle1(),queue-age:" + msg); } @RabbitListener(queues = "queue-name") public void handle2(String msg) { System.out.println("handle2(),queue-name:" + msg); }}
3.测试1
4.测试2