欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Springboot安装整合RabbitMQ实现发布订阅

时间:2023-04-20
一、RabbitMQ概念

RabbitMQ是流行的开源消息队列系统,是AMQP(Advanced Message Queuing Protocol高级消息队列协议)的标准实现,用erlang语言开发。RabbitMQ据说具有良好的性能和时效性,同时还能够非常好的支持集群和负载部署,非常适合在较大规模的分布式系统中使用。

RabbitMQ的5种模式

1.不直接Exchange交换机(默认交换机)

simple简单模式:一个生产者生产一个消息到一个队列被一个消费者接收work工作队列模式:生产者发送消息到一个队列中,然后可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系2.使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)

订阅模式与前面的两种模式比较:多了一个角色Exchange交换机,接收生产者发送的消息并决定如何投递消息到其绑定的队列;消息的投递决定于交换机的类型。

发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列路由模式:使用了direct定向类型的交换机,消费会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息通配符模式:使用了topic通配符类型的交换机,消费会携带路由key(*, #),交换机根据消息的路由key与队列的路由key进行对比,匹配的话那么该队列可以接收到消息二、安装RabbitMQ(windows)

RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(OpenTelecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配。

安装erlang

下载地址:http://erlang.org/download/otp_win64_20.3.exe
以管理员身份运行此文件进行安装。 。

 erlang安装完成需要配置erlang 系统环境变量: ERLANG_HOME=C:Program Fileserl9.3(以实际路径为主) 在path中添加%ERLANG_HOME%bin;

安装RabbitMQ

 下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.14
以管理员身份运行此文件进行安装。安装完成后可以在系统服务中查看到RabbitMQ服务。

配置插件

为了更加方便的管理RabbitMQ服务,可以安装RabbitMQ提供的一个浏览器端管理插件,可以通过浏览器页面方便
的进行服务管理。
安装方式:
1、以管理员身份打开 cmd (不是PowerShell);然后进入在RabbitMQ的安装目录下sbin目录

2、在上述窗口执行命令: rabbitmq-plugins.bat enable rabbitmq_management

 打开浏览器访问网站http://localhost:15672进入登录页面,默认账号和密码都为guest

三、整合springboot

这边介绍订阅模式中的路由模式,广播则改变交换机类型并且不使用路由key即可。(队列模式则不使用交换机通过队列直接传输比较简单不做介绍)

先启动RabbitMQ

引入pom文件

org.springframework.boot spring-boot-starter-amqp

基本配置 (admin是我自己添加的用户)

spring: rabbitmq: host: localhost port: 5672 virtual-host: /admin username: admin password: 123456

该配置用于传输对象自动转换json

@Configurationpublic class RabbitConfig { @Bean public Jackson2JsonMessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }}

代码实现

添加交换机、队列、路由key(一个交换机可绑定多个队列,注释的地方为另一种绑定队列交换机的方式)

@Configurationpublic class RabbitMQConfig { //交换机名称 public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange"; //队列名称 public static final String ITEM_QUEUE = "item_queue"; //路由key名称 public static final String ITEM_KEY = "item.delete"; //声明交换机// @Bean("itemTopicExchange")// public Exchange topicExchange(){// return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();// }// //声明队列// @Bean("itemQueue")// public Queue itemQueue(){// return QueueBuilder.durable(ITEM_QUEUE).build();// }//// //绑定队列和交换机// @Bean// public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,@Qualifier("itemTopicExchange") Exchange exchange){// return BindingBuilder.bind(queue).to(exchange).with("item.delete").noargs();}

生产者及消费者

@Controllerpublic class StudentController {// @Autowired// private StudentMapper studentMapper; @Autowired private RabbitTemplate rabbitTemplate; @ResponseBody @RequestMapping(value = "/add" ,method = {RequestMethod.POST}) public void addStudent(){ Student student = new Student(); student.setId(1); student.setNaem("zs"); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.delete",student); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.update",student+""); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.insert",student+""); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = RabbitMQConfig.ITEM_QUEUE), exchange = @Exchange(name = RabbitMQConfig.ITEM_TOPIC_EXCHANGE,type = ExchangeTypes.TOPIC), key = RabbitMQConfig.ITEM_KEY )) public void myListener1(Student message){ System.out.println("消费者1接收到的消息为:"+message); }}

调用接口之后消费者接收到 生产者中路由key为item.delete的对象信息。

需要注意若是修改交换机类型或者绑定队列,需要更改交换机名字或者进RabbitMQ后台删除原来的交换机,不然修改无效!

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。