RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
一、四大核心概念生产者 :产生数据发送消息的程序是生产者
交换机 :交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列 :队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 消费者 消费与接收具有相似的含义。
消费者:大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
二、RabbitMQ安装1)官网地址 https://www.rabbitmq.com/download.html
2)文件上传 上传到/local/software 目录下(自定义创建目录存放rabbitmq安装文件)
3)安装文件(分别按照以下顺序安装)
#首先安装rabbitmq需要的erlang环境;建议使用 centos7.X版本的系统rpm -ivh erlang-21.3-1.el7.x86_64.rpmyum install socat -yrpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm常用命令(按照以下顺序执行)添加开机启动 RabbitMQ 服务: chkconfig rabbitmq-server on启动服务: /sbin/service rabbitmq-server start 查看服务状态: /sbin/service rabbitmq-server status
停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
用默认账号密码(guest)访问地址 http://192.168.0.195:15672/出现权限问题
添加一个新的用户#创建账号(用户名:admin 密码:123)rabbitmqctl add_user admin 123设置用户角色rabbitmqctl set_user_tags admin administrator设置用户权限rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限查看当前用户和角色rabbitmqctl list_users
可使用admin 、123登录rabbitmq后台了!
关闭应用的命令为:rabbitmqctl stop_app清除的命令为:rabbitmqctl reset重新启动命令为:rabbitmqctl start_app
三、hello world1)pom.xml依赖
2)生产者
package com.st.rabbitmq.one_test;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { private final static String QUEUE_NAME = "hello";//队列名称 public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.195"); factory.setUsername("admin"); factory.setPassword("123"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message="This is rabbitmq send first message!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } }}
3)消费者
package com.st.rabbitmq.one_test;import com.rabbitmq.client.*;import com.st.rabbitmq.Util.RabbitMqUtils;public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息...."); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断..."); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); }}
4)RabbitMqUtils工具类
package com.st.rabbitmq.Util;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.195"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; }}