欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ概念、安装、helloworld(一)

时间:2023-05-12

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。

一、四大核心概念

生产者 :产生数据发送消息的程序是生产者

交换机 :交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列 :队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 消费者 消费与接收具有相似的含义。

消费者:大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

二、RabbitMQ安装 

1)官网地址 https://www.rabbitmq.com/download.html

2)文件上传 上传到/local/software 目录下(自定义创建目录存放rabbitmq安装文件)

3)安装文件(分别按照以下顺序安装)

#首先安装rabbitmq需要的erlang环境;建议使用 centos7.X版本的系统rpm -ivh erlang-21.3-1.el7.x86_64.rpmyum install socat -yrpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm常用命令(按照以下顺序执行)添加开机启动 RabbitMQ 服务: chkconfig rabbitmq-server on启动服务: /sbin/service rabbitmq-server start 查看服务状态: /sbin/service rabbitmq-server status

 停止服务(选择执行)

/sbin/service rabbitmq-server stop

开启 web 管理插件

rabbitmq-plugins enable rabbitmq_management

用默认账号密码(guest)访问地址 http://192.168.0.195:15672/出现权限问题

添加一个新的用户#创建账号(用户名:admin 密码:123)rabbitmqctl add_user admin 123设置用户角色rabbitmqctl set_user_tags admin administrator设置用户权限rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限查看当前用户和角色rabbitmqctl list_users

 

可使用admin 、123登录rabbitmq后台了!

关闭应用的命令为:rabbitmqctl stop_app清除的命令为:rabbitmqctl reset重新启动命令为:rabbitmqctl start_app

 三、hello world

1)pom.xml依赖

org.apache.maven.plugins maven-compiler-plugin 8 8 com.rabbitmq amqp-client 5.8.0 commons-io commons-io 2.6

2)生产者

package com.st.rabbitmq.one_test;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { private final static String QUEUE_NAME = "hello";//队列名称 public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.195"); factory.setUsername("admin"); factory.setPassword("123"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message="This is rabbitmq send first message!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } }}

3)消费者

package com.st.rabbitmq.one_test;import com.rabbitmq.client.*;import com.st.rabbitmq.Util.RabbitMqUtils;public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息...."); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断..."); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); }}

4)RabbitMqUtils工具类

package com.st.rabbitmq.Util;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.195"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。