欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

分布式-MQ-01RabiitMQ安装及基本使用

时间:2023-04-21
RabiitMQ安装及基本使用

1 RabiitMQ安装2 MQ概述

2.1 MQ的优势2.2 MQ的劣势 3 常见的MQ产品4 控制台使用

4.1 新增用户及配置权限4.2 Virtual Hosts 配置 5 RabbitMQ的不同工作模式特性剖析

5.1 RabbitMQ快速入门之生产与消费5.2RabbitMQ工作模式之workQueues模式 1 RabiitMQ安装

RabbitMQ官网:https://rabbitmq.com/
适用于linux的安装版本:https://rabbitmq.com/install-rpm.html#downloads
当前最新稳定版本:rabbitmq-server-3.9.13-1.el8.noarch.rpm

#本地学习用的虚拟机是centos7系统,学习测试rabbit没有选择最新版本,本文的安装版本# rabbitmq-server-3.6.5-1.noarch.rpm# erlang-18.3-1.el7.centos.x86_64.rpm# socat-1.7.3.2-5.el7.lux.x86_64.rpm# 1.上传rpm包至服务器# 2.安装Erlangrpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm# 3.安装RabbitMQrpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpmrpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm# 4.开启管理界面及配置rabbitmq-plugins enable rabbitmq_management# 修改默认配置信息 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app # 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保 留guest# 5.启动service rabbitmq-server start # 启动服务 service rabbitmq-server stop # 停止服务service rabbitmq-server restart # 重启服务#设置配置文件cd /usr/share/doc/rabbitmq-server-3.6.5/ cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config#windows浏览器登陆管理台http://192.168.149.128:15672/# 6.配置虚拟主机及用户


小插曲:安装 socat-1.7.3.2-1.1.el7.x86_64.rpm时,提示error,需要以来插件。解决办法:无视依赖,直接–force --nodeps继续安装。

开启管理界面:

修改默认配置及开启服务(开放guest账号,loopback_users中的删除<<>>即可):


小插曲:此时尝试登陆rabbitMQ管理台发现返回502服务不可用,但是通过在linux服务器上却能下载到首页面,考虑是防火墙问题或端口未开放。

尝试开放端口及关闭防火墙均无效:

firewall-cmd --zone=public --add-port=15672/tcp --permanent #开放指定端口号firewall-cmd --zone=public --add-port=5672/tcp --permanent #开放指定端口号,后面接口调用需要systemctl stop firewalld #关闭防火墙firewall-cmd --reload #重启防火墙 添加完开放端口一样要重启防火墙

最终发现本地开启了全局代理访问,路由转出门再访问局域网肯定还不行的,关掉代理即可。

2 MQ概述

Message Queue消息队列,实在消息传输的过程中保存消息的容器,多用于分布式系统之间进行通信。

2.1 MQ的优势

应用解耦
系统的耦合度降低,容错率提高,可维护性越高。
当A、B、C服务间本不存在依赖关系,如果A服务异常,在同步调用时,将导致B、C服务无法继续执行。
异步提速
当服务提供的某些能力越来越强大、越来越复杂,势必无法稳定保证快速应答。此时将串行化程序,转为异步处理。
A、B、C服务内部处理MQ的任务,相互间不产生影响。
削峰填谷
MQ功能中极其重要的优势。将瞬时过量的请求,写入MQ进行限流,由消费端自行从MQ拉取任务进行处理。
现在后端服务架构中,常见时延要求不高,但处理任务量比较大的需求。且在无法预知某时间点用户侧并发量时,通过削峰填谷保证服务按照处理能力执行任务。
2.2 MQ的劣势

系统可用性降低
系统引入的外部依赖变多,系统的稳定性降低。一旦MQ宕机,会对业务功能造成影响。
解决办法:搭建集群,当单台MQ宕机时,保证其他机器正常可用。系统复杂度提高
MQ的加入增加了系统复杂度,以前的同步远程调用,现在变成异步调用,需要保证消息不丢失。 3 常见的MQ产品

RabbitMQ -> RocketMQ -> kafka -> DMQ

RabbitMQActiveMQRocketMQKafkaDMQ公司/社区RabbitApache阿里Apache华为开发语言ErlangJavaJavaScala&JavaJava协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义自定义协议,社区封装了http协议支持基于kafka封装客户端支持语言官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言Java,C,C++,Python,PHP,Perl,.net等Java,C++(不成熟)官方支持Java,社区产出多种API,如PHP,Python等Java、Python等单机吞吐量万级(其次)万级(最差)十万级(最好)十万级(次之)不详消息延迟微妙级毫秒级毫秒级毫秒以内毫秒级功能特性并发能力强,性能极其好,延时低,社区活跃,管理界面丰富老牌产品,成熟度高,文档较多MQ功能比较完备,扩展性佳只支持主要的MQ功能,毕竟是为大数据领域准备的。使用zookeeper作为注册中心,实现配置下发、服务发现;kafka作为分布式发布订阅消息系统[0.8.2]4 控制台使用 4.1 新增用户及配置权限


用户密码自不必多说,来看看Tags的区别:

超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。其他
无法登陆管理控制台,通常就是普通的生产者和消费者。 4.2 Virtual Hosts 配置

创建 Virtual Hosts设置 Virtual Hosts 权限
5 RabbitMQ的不同工作模式特性剖析

RabbitMQ消息队列都是基于消费者生产者模式组织的。其中的Bcoker、channel、Exchange、Queue等概念如下。

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message BrokerVirtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等Connection:publisher/consumer 和 broker 之间的TCP 连接Channel:类似于Netty的channel,进程间通过TCP连接时,如果建立长连接,通过socket传输内容,可以大大地减少建立连接的开销。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销


RabbitMQ的工作模式,在官网中讲的比较详细 https://www.rabbitmq.com/getstarted.html

5.1 RabbitMQ快速入门之生产与消费

//消费者import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.149.128"); connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号 connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/asky"); Connection connection = connectionFactory.newConnection(); //创建通信管道,相当于TCP中的虚拟连接 Channel channel = connection.createChannel(); //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 //第一个参数:队列名称ID //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问 //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列 //其他额外的参数, null channel.queueDeclare("hello",false, false, false, null); //从MQ服务器中获取数据 //创建一个消息消费者 //第一个参数:队列名 //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法 //第三个参数要传入DefaultConsumer的实现类 channel.basicConsume("hello", false, new Reciver(channel)); }}class Reciver extends DefaultConsumer { private Channel channel; //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到 public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费者接收到的消息:"+message); System.out.println("消息的TagId:"+envelope.getDeliveryTag()); //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息 channel.basicAck(envelope.getDeliveryTag(), false); }}

//生产者import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.149.128"); connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号 connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/asky"); Connection connection = connectionFactory.newConnection(); //创建通信管道,相当于TCP中的虚拟连接 Channel channel = connection.createChannel(); //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 //第一个参数:队列名称ID //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问 //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列 //其他额外的参数, null channel.queueDeclare("hello",false, false, false, null); String message = "zwx"; //四个参数 //exchange 交换机,暂时用不到,发布订阅时才会用到 //队列名称 //额外的设置属性 //最后一个参数是要传递的消息字节数组 channel.basicPublish("","hello", null,message.getBytes()); channel.close(); connection.close(); System.out.println("===发送成功==="); }}

5.2RabbitMQ工作模式之workQueues模式


与简单模式的区别在于,工作队列下生产者与消费者可以是多个。而消费端没有topic、groupId一说,queue中的消息将依次被多个消费者消费,默认采用轮询模式。而如果为了区分机器性能和处理能力,可以设置如下设置,当机器处理完消息后再从Queue中拉取消息。

//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。