欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ入门之常见模式

时间:2023-07-15
1.什么是MQ

MQ(Message Queue):消息队列,是一种"先进先出"的数据结构。典型的模型就是我们所说的生产者、消费者模型。生产者不断地向消息队列中生产消息,消费者不断地从消息队列中获取消息,同时消息的生产和消费都是异步的,可以实现系统间的解耦

2.什么是RabbitMQ

RabbitMQ是使用Erlang语言开发的基于高级消息队列协议(Advanced Message Queuing Protocol,AMQP)的开源消息队列。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、数据可靠性、数据安全性

3.安装RabbitMQ

使用cat /etc/os-release查看系统版本号,我这里使用的是Ubuntu 20.04,对应的分支是focal

cat /etc/os-releaseNAME="Ubuntu"VERSION="20.04.2 LTS (Focal Fossa)"ID=ubuntuID_LIKE=debianPRETTY_NAME="Ubuntu 20.04.2 LTS"VERSION_ID="20.04"HOME_URL="https://www.ubuntu.com/"SUPPORT_URL="https://help.ubuntu.com/"BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"VERSION_CODENAME=focalUBUNTU_CODENAME=focal

3.1 安装erlang和RabbitMQ

使用以下脚本快速安装:

#!/usr/bin/shsudo apt-get install curl gnupg apt-transport-https -y## Team RabbitMQ's main signing keycurl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null## Cloudsmith: modern Erlang repositorycurl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null## Cloudsmith: RabbitMQ repositorycurl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null## Add apt repositories maintained by Team RabbitMQsudo tee /etc/apt/sources.list.d/rabbitmq.list <

3.2 启动RabbitMQ

# 启动rabbitmqsystemctl start rabbitmq-server# 查看rabbitmq运行状态systemctl status rabbitmq-server

3.3 加载web管理界面插件

# 加载RabbitMQ的插件,这样我们可以使用web界面来管理RabbitMQ,默认使用guest用户登录,且必须使用localhost:15672来访问管理界面sudo rabbitmq-plugins enable rabbitmq_management# username:guest# password:guest

3.4 RabbitMQ配置文件

RabbitMQ给我们提供了一个配置文件模版,我们可以参照这个来配置。

模版文件地址:https://github.com/rabbitmq/rabbitmq-server/blob/v3.8.x/deps/rabbit/docs/rabbitmq.conf.example

在/etc/rabbitmq目录下创建rabbitmq.conf

# 文件名rabbitmq.conf# 当该值为true时,我们只能通过localhost:15672来访问管理界面# 当该值为false时,我们可以通过ip:15672来访问管理界面loopback_users.guest = false

3.5 相关命令

# 查看相关命令的使用sudo rabbitmqctl help

4、Java整合RabbitMQ 4.1 引入依赖

com.rabbitmqamqp-client5.10.0

4.2 第一种模型(直连)

直连模式下,只有一个生产者和消费者,如果消费者处理消息的速度慢,但是生产者在源源不断的生产消息,就会导致消息的挤压

4.2.1 创建生产者

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 如果不设置,默认使用guest用户 factory.setUsername("stone"); factory.setPassword("123456"); factory.setHost("192.168.0.19"); factory.setVirtualHost("test"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments // 声明与队列相关的参数,boolean durable 如果设置为true的话就是将队列持久化 channel.queueDeclare(QUEUE_NAME, true, false, false, null); String message = "Hello World!"; // String exchange, String routingKey, BasicProperties props, byte[] body // 发布消息 // MessageProperties.PERSISTENT_TEXT_PLAIN 将消息持久化 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } }}

4.2.2 创建消费者

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class Recv { // 这里我们并没有使用try-with-resource语句自动关闭channel和connection,这样可以使程序一直保持运行接收消息 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 如果不设置,默认使用guest用户 factory.setUsername("stone"); factory.setPassword("123456"); factory.setHost("192.168.0.19"); factory.setVirtualHost("test"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages、To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}

4.3第二种模型(Work Queue)

在工作队列模式中,默认使用的是轮询调度(Round-robin dispatching),RabbitMQ将会依次将消息发送给每个消费者,每个消费者将获得相同数量的消息

也可以手动设置为公平调度(Fair dispatch),即处理消息快的消费者会获得更多数量的消息来处理,处理消息慢的消费者获得的消息数量相对较少。

// 告诉RabbitMQ一次只给消费者一条消息,在该消费者处理完上一条消息之前,不再给该消费者发送消息channel.basicQos(1);

4.3.1 生产者

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 如果不设置,默认使用guest用户 factory.setUsername("stone"); factory.setPassword("123456"); factory.setHost("192.168.0.104"); factory.setVirtualHost("test"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 队列持久化 boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); for (int i = 0; i < 100; i++) { String message = "task_message_."; message = message + i; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }}

4.3.2 消费者1

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class Worker1 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 如果不设置,默认使用guest用户 factory.setUsername("stone"); factory.setPassword("123456"); factory.setHost("192.168.0.104"); factory.setVirtualHost("test"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages、To exit press CTRL+C"); // 指示一次性只接收一条消息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 将消息确认改为收到确认,当消费者处理消息宕机时,可以保证消息不丢失 boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(3); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } }}

4.3.3 消费者2

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class Worker2 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 如果不设置,默认使用guest用户 factory.setUsername("stone"); factory.setPassword("123456"); factory.setHost("192.168.0.104"); factory.setVirtualHost("test"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages、To exit press CTRL+C"); // 指示一次性只接收一条消息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 将消息确认改为收到确认,当消费者处理消息宕机时,可以保证消息不丢失 boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } }}

4.4 第三种模型(Publish/Subscribe)

在Publish/Subscribe模型中,生产者将消息发布到Exchange中,Exchange将消息推送到队列中,消费者再去队列中获得消息进行消费,这种模式是将一条消息发送给多个消费者。

RabbitMQ中消息传递模型的核心思想是,生产者从不直接向队列发送任何消息。生产者将消息发送到Exchange,Exchange将消息推送到队列中。

Exchange的类型:direct,topic,headers 和fanout。

**在这种模式下,我们主要介绍fanout类型的Exchange。**使用fanout类型,不需要设置routingKey,Exchange会将消息广播到与之绑定的所有的队列中。

direct

direct类型的Exchangequeue和Exchange绑定,并设置一个routingKey和routingKey完全匹配的消息将被路由到queue topic

topic类型的交换机queue和Exchange绑定,并设置一个规则的routingKey匹配routingKey规则的消息,将路由到指定的queue路由规则

* 指代一个字符# 指代一个或多个字符 headersfanout

fanout类型的Exchangequeue和Exchange绑定,不设置routingKey将收到的消息广播到与之绑定的所有队列 4.4.1 生产者

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class Provider { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.106"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for (int i = 0; i < 10; i++) { String message = "测试fanout模型_" + i; // 因为是广播模型,所以不需要指定routingKey,消息将会推送至所有的queue channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }}

4.4.2 消费者

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class Consumer { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.106"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 获得临时队列 String queueName = channel.queueDeclare().getQueue(); // 将交换机和queue绑定起来,无需routingKey channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages、To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}

4.5 第四种模型(Routing)

与Subscribe/Publish模式不同之处在于,Routing模式的Exchange类型是direct,并且queue和Exchange绑定的时候,设置了routingKey,只有routingKey完全匹配的消息才会路由到queue中。

当消息的routingKey为error时,消息将被路由到q1,当消息的routingKey为warning/info时,消息将被路由到q2。

4.5.1 生产者

public class Provider { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.106"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String routingKey = "info"; for (int i = 0; i < 10; i++) { String message = "direct_routing_message_" + i; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } }}

4.5.2 消费者

public class Consumer { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.106"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 获得临时队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "info"); System.out.println(" [*] Waiting for messages、To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}

该模式下需要routingKey完全匹配的消息才能路由到queue中,在消息类型多的情况下不利于扩展,于是Topics模式诞生了

4.6 第五种模型(Topics)

该模式与Routing模式的区别在于,Routing模式下,routingKey是指定的,Topics模式下,routingKey必须是一个单词列表,用.分割,最多255个字节,例如:"my.routingkey",我们可以使用通配符来指定路由规则。

当然如果我们在topics模式中,不使用特殊字符*和#,其效果和Routing模式是一样的。

*:指代一个字符

#:指代0个或多个字符

4.6.1 生产者

public class Provider { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "message.a"; String message ="topic_message" ; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } }}

4.6.2 消费者

public class Consumer { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue();// 绑定队列,并设置路由规则,消费者能消费到routingKey例如:message.a,message.b的消息 channel.queueBind(queueName, EXCHANGE_NAME, "message.*"); System.out.println(" [*] Waiting for messages、To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}

最后,欢迎关注微信公众号一起交流

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。