欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ安装启动以及与springboot整合

时间:2023-05-14
第一步安装erlang环境:

下载:
由于rabbitmq是erlang来写的,所以需要安装erlang环境:
https://www.erlang-solutions.com/resources/download.html
安装:

yum -y install esl-erlang_23.0.2-1_centos_7_amd64.rpm

检测erlang:

erl

第二步安装RabbitMQ

下载:

http://www.rabbitmq.com/download.html

安装rabbitmq:

yum -y install rabbitmq-server-3.8.5-1.el7.noarch.rpm

查看rabbitmq的插件:rabbitmq-plugins list使用命令安装rabbitmq管理插件:rabbitmq-plugins enable rabbitma_management启动rabbitmq:systemctl start rabbitmq-server.service查看rabbitmq状态:systemctl status rabbitmq-server.service访问ip:15672用户名密码默认guest出现警告:User can only log in via localhost解决方法:cd /etc/rabbitmq/创建: vim rabbitmq.config添加:[{rabbit,[{loopback_users,[]}]}].重启rabbitmq:systemctl restart rabbitmq-server.service

记得开放服务器防火墙和安全组的端口号!!!
管控台插件页面:

第三步整合springboot

创建springboot项目添加依赖:

org.springframework.boot spring-boot-starter-amqp

application.yml添加配置:

#RabbitMq rabbitmq: #服务器 host: *.*.*.* #用户名 username: guest #密码 password: guest #虚拟主机 virtual-host: / #端口 port: 5672 listener: simple: #消费者最小数量 concurrency: 10 #消费者最大数量 max-concurrency: 10 #限制消费者每次只处理一条消息,处理完再继续下一条消息 prefetch: 1 #启动时是否默认启动容器,默认true auto-startup: true #被拒绝时重新进入队列 default-requeue-rejected: true template: retry: #发布重试,默认false enabled: true #重试时间,默认1000ms initial-interval: 1000ms #重试最大次数,默认3次 max-attempts: 3 #重试最大间隔时间 max-interval: 10000ms #重试的间隔乘数 比如2.0 第一次就等10s 第二次就等20s,第三次就等40s multiplier: 1

hello入门案例:

创建rabbitmq配置类:RabbitMQConfig

import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig { @Bean public Queue queue(){ return new Queue("queue",true); }}

创建生产者:MQSender

import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Service@Slf4jpublic class MQSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(Object msg){ log.info("发送消息"+msg); rabbitTemplate.convertAndSend("queue",msg); }}

创建接收者:MQReceiver

import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;@Service@Slf4jpublic class MQReceiver { @RabbitListener(queues = "queue") public void receive(Object msg){ log.info("接收消息:"+ msg); }}

编写测试接口:

@RequestMapping("/mq")@ResponseBodypublic void mq(){mqSender.send("Hello");}


在rabbitmq控制台就会有数据显示:

fanout模式

RabbitMQConfig配置类:

import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig { //fanout模式创建队列与交换机 private static final String Queue01="queue_fanout01"; private static final String Queue02="queue_fanout02"; private static final String EXCHANGE="fanoutExchange"; @Bean public Queue queue(){ return new Queue("queue",true); } //创建队列与交换机实例 @Bean public Queue queue01(){ return new Queue(Queue01); } @Bean public Queue queue02(){ return new Queue(Queue02); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE); } //交换机与队列进行绑定 @Bean public Binding binding01(){ return BindingBuilder.bind(queue01()).to(fanoutExchange()); } @Bean public Binding binding02(){ return BindingBuilder.bind(queue02()).to(fanoutExchange()); }}

生产者MQSender 生产者向交换机发送消息:

import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Service@Slf4jpublic class MQSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(Object msg){ log.info("发送消息"+msg); rabbitTemplate.convertAndSend("queue",msg); } public void send02(Object msg){//使用交换机向队列推送消息 log.info("发送消息"+msg); rabbitTemplate.convertAndSend("fanoutExchange","",msg); }}

接收者MQReceiver :

@Service@Slf4jpublic class MQReceiver { @RabbitListener(queues = "queue") public void receive(Object msg){ log.info("接收消息:"+ msg); } //获取队列中的消息 @RabbitListener(queues = "queue_fanout01") public void receive01(Object msg){ log.info("QUEUE01接收消息:"+msg); } //获取队列中的消息 @RabbitListener(queues = "queue_fanout02") public void receive02(Object msg){ log.info("QUEUE接收消息:"+msg); }}

测试:

@RequestMapping("/mq/fanout") @ResponseBody public void mq01() { mqSender.send02("Hello"); }

交换机:

队列:

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。