下载:
由于rabbitmq是erlang来写的,所以需要安装erlang环境:
https://www.erlang-solutions.com/resources/download.html
安装:
yum -y install esl-erlang_23.0.2-1_centos_7_amd64.rpm
检测erlang:
erl
第二步安装RabbitMQ下载:
http://www.rabbitmq.com/download.html
安装rabbitmq:
yum -y install rabbitmq-server-3.8.5-1.el7.noarch.rpm
查看rabbitmq的插件:rabbitmq-plugins list使用命令安装rabbitmq管理插件:rabbitmq-plugins enable rabbitma_management启动rabbitmq:systemctl start rabbitmq-server.service查看rabbitmq状态:systemctl status rabbitmq-server.service访问ip:15672用户名密码默认guest出现警告:User can only log in via localhost解决方法:cd /etc/rabbitmq/创建: vim rabbitmq.config添加:[{rabbit,[{loopback_users,[]}]}].重启rabbitmq:systemctl restart rabbitmq-server.service
记得开放服务器防火墙和安全组的端口号!!!
管控台插件页面:
创建springboot项目添加依赖:
application.yml添加配置:
#RabbitMq rabbitmq: #服务器 host: *.*.*.* #用户名 username: guest #密码 password: guest #虚拟主机 virtual-host: / #端口 port: 5672 listener: simple: #消费者最小数量 concurrency: 10 #消费者最大数量 max-concurrency: 10 #限制消费者每次只处理一条消息,处理完再继续下一条消息 prefetch: 1 #启动时是否默认启动容器,默认true auto-startup: true #被拒绝时重新进入队列 default-requeue-rejected: true template: retry: #发布重试,默认false enabled: true #重试时间,默认1000ms initial-interval: 1000ms #重试最大次数,默认3次 max-attempts: 3 #重试最大间隔时间 max-interval: 10000ms #重试的间隔乘数 比如2.0 第一次就等10s 第二次就等20s,第三次就等40s multiplier: 1
hello入门案例:创建rabbitmq配置类:RabbitMQConfig
import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig { @Bean public Queue queue(){ return new Queue("queue",true); }}
创建生产者:MQSender
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Service@Slf4jpublic class MQSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(Object msg){ log.info("发送消息"+msg); rabbitTemplate.convertAndSend("queue",msg); }}
创建接收者:MQReceiver
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;@Service@Slf4jpublic class MQReceiver { @RabbitListener(queues = "queue") public void receive(Object msg){ log.info("接收消息:"+ msg); }}
编写测试接口:
@RequestMapping("/mq")@ResponseBodypublic void mq(){mqSender.send("Hello");}
在rabbitmq控制台就会有数据显示:
RabbitMQConfig配置类:
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig { //fanout模式创建队列与交换机 private static final String Queue01="queue_fanout01"; private static final String Queue02="queue_fanout02"; private static final String EXCHANGE="fanoutExchange"; @Bean public Queue queue(){ return new Queue("queue",true); } //创建队列与交换机实例 @Bean public Queue queue01(){ return new Queue(Queue01); } @Bean public Queue queue02(){ return new Queue(Queue02); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE); } //交换机与队列进行绑定 @Bean public Binding binding01(){ return BindingBuilder.bind(queue01()).to(fanoutExchange()); } @Bean public Binding binding02(){ return BindingBuilder.bind(queue02()).to(fanoutExchange()); }}
生产者MQSender 生产者向交换机发送消息:
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Service@Slf4jpublic class MQSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(Object msg){ log.info("发送消息"+msg); rabbitTemplate.convertAndSend("queue",msg); } public void send02(Object msg){//使用交换机向队列推送消息 log.info("发送消息"+msg); rabbitTemplate.convertAndSend("fanoutExchange","",msg); }}
接收者MQReceiver :
@Service@Slf4jpublic class MQReceiver { @RabbitListener(queues = "queue") public void receive(Object msg){ log.info("接收消息:"+ msg); } //获取队列中的消息 @RabbitListener(queues = "queue_fanout01") public void receive01(Object msg){ log.info("QUEUE01接收消息:"+msg); } //获取队列中的消息 @RabbitListener(queues = "queue_fanout02") public void receive02(Object msg){ log.info("QUEUE接收消息:"+msg); }}
测试:
@RequestMapping("/mq/fanout") @ResponseBody public void mq01() { mqSender.send02("Hello"); }
交换机:
队列: