一、消息队列
1.1 作用1.2 主流消息队列比较 二、RabbitMQ的安装
2.1 安装
2.1.1 Docker 方式2.1.2 原生方式(Ubuntu 20.04) 2.2 管理插件的用法 三、RabbitMQ快速入门
3.1 名词介绍3.2 Hello World!3.3 任务队列(work queue)
3.3.1 循环调度3.3.2 消息确认3.3.3 消息持久化3.3.4 公平调度 3.4 发布/订阅(Publish/Subscribe)
3.4.1 交换器(Exchanges)3.4.2 临时队列3.4.3 绑定 3.5 路由(Routing)
3.5.1 direct 交换器3.5.2 topic 交换器 一、消息队列
在计算机中,消息是一个程序发送给另一个程序的数据,而队列(一种数据结构)就是一个容器,存放在里面的东西,都是先放进来的先被拿出去。所以,消息队列就是在消息的传输过程中暂时保存消息的容器。使用队列,能够避免在收、发消息需要同步进行的弊端,可以让发送方直接将消息放如队列,避免因为等待接收方而阻塞。
1.1 作用
应用解耦:
把一个大系统拆分为多个小系统,互相之间用消息队列来做数据的交互或函数的调用。
流量削峰:
在流量特别大的应用场景内,比如秒杀活动。这种情况下,服务端很容易崩溃。但是我直接拒绝用户,对用户来说是一种极为不好的用户体验。所以,我们可以控制处理请求的速度,将暂时处理不了的请求放入消息队列,让用户稍等一会儿,这比直接拒绝好很多。
消息分发:
就是将数据发送给需要接收数据的其他主机、程序等
异步处理:
比如,在用户成功购买某一件商品后,同时向他发送订单的手机短信和电子邮件,然后跳转到成功页面:
没有消息队列:先将订单数据交给短信模块发送手机短信,然后交给邮件模块发送电子邮件,完成后跳转到成功页面。使用消息队列:直接跳转到成功页面,至于手机短信和电子邮件,可以把订单数据放入消息队列中,短信模块和邮件模块在后台异步读取数据并发送。 1.2 主流消息队列比较 Kafka RocketMQ RabbitMQ 单机吞吐量十万级十万级万级消息延迟毫秒级毫秒级微秒级可用性非常高(分布式)非常高(分布式)高(主从)消息丢失理论上不会丢失理论上不会丢失低社区活跃度高中高二、RabbitMQ的安装 2.1 安装 2.1.1 Docker 方式
# for RabbitMQ 3.9, the latest seriesdocker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management# for RabbitMQ 3.8,# 3.8.x support timeline: https://www.rabbitmq.com/versions.htmldocker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
2.1.2 原生方式(Ubuntu 20.04)将下面的代码,保存到rabbitMQ_install.sh文件中:
#!/usr/bin/shsudo apt-get install curl gnupg apt-transport-https -y## Team RabbitMQ's main signing keycurl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null## Cloudsmith: modern Erlang repositorycurl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null## Cloudsmith: RabbitMQ repositorycurl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null## Add apt repositories maintained by Team RabbitMQsudo tee /etc/apt/sources.list.d/rabbitmq.list <
给文件添加执行权限:
sudo chmod +x ./rabbitMQ_install.sh
执行脚本:
sudo bash ./rabbitMQ_install.sh
2.2 管理插件的用法启动 RabbitMQ服务:
sudo systemctl start rabbitmq-server
创建 RabbitMQ用户:
sudo rabbitmqctl add_user '用户名' '密码'
其实,RabbitMQ自带了一个来宾用户,用户名和密码都是guest。
授予管理员权限:
sudo rabbitmqctl set_user_tags 用户名 administrator
向虚拟主机中的用户授予所有权限:
sudo rabbitmqctl set_permissions -p / 用户名 '.*' '.*' '.*'
启用管理插件:
sudo rabbitmq-plugins enable rabbitmq_management
访问管理页面:http://localhost:15672/,输入前面创建的用户名和密码即可。
三、RabbitMQ快速入门 3.1 名词介绍
生产(Producing):即发送消息,发送消息的程序被称为“生产者( producer)”。
消费(Consuming):即接收消息,接收消息的程序被称为“消费者(consumer)”
队列(Queue):就是存放消息的地方。队列只受主机的内存和磁盘限制,它本质上是一个大的消息缓冲区。
可以有多个生产者将消息发送到一个队列,也可以有多个消费者尝试从一个队列接收数据。
注意:生产者、消费者、消息队列可以放在不同的机器上;生产者和消费者也可以是同一个程序。
3.2 Hello World!接下来,我们用 python 代码编写两个小程序,生产者发送消息(将消息放入队列),消费者接收消息(从队列取出消息),消息内容为“Hello World!”。这是一个最精简的 RabbitMQ 的使用过程。
安装 RabbitMQ 官方推荐的 python 客户端:
pip install pika
发送消息,新建 send.py 文件:
#!/usr/bin/env pythonimport pika# 连接到本地的消息队列connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 创建队列,队列名称为 hellochannel.queue_declare(queue='hello')# 发送消息channel.basic_publish(exchange='', routing_key='hello', # 队列名称 body='Hello World!') # 消息内容print(" [x] 发送 'Hello World!'")# 关闭连接connection.close() # 会自动刷新网络缓存区,确保消息被发送
接收消息,新建 receive.py 文件:
#!/usr/bin/env pythonimport pika, sys, osdef main(): # 连接到本地的消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 创建队列,队列名称为 hello channel.queue_declare(queue='hello') # 回调函数,每当接收到消息时被调用 def callback(ch, method, properties, body): print(" [x] 接收到 %r" % body) channel.basic_consume(queue='hello', # 指定接受消息的队列 auto_ack=True, # 是否自动调用回调函数 on_message_callback=callback) # 指定回调函数 print(' [*] 等待消息、按下 CTRL+C 退出') # 进入一个循环,等待消息 channel.start_consuming()if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)
注意:消费者和生产者都进行了同一个队列的创建,目的是无论哪一方先被启动,都能确保队列存在。
启动消费者:
python receive.py# [*] 等待消息、按下 CTRL+C 退出# [x] 接收到 b'Hello World!'
启动生产者:
python send.py# [x] 发送 'Hello World!'
3.3 任务队列(work queue)任务队列(也叫工作队列)是存储了一些资源密集型任务的队列,这些任务被封装成一个个的消息。目的是为了避免资源密集型任务对当前进程的阻塞,在后台中让专门的任务处理进程(worker)从任务队列中取出任务(就是消息,这里叫任务更加贴切)去执行。而且,当有多个任务处理进程一起工作时,便于任务在它们之间共享。
修改 send.py 中的代码,保存为 new_task.py ,要修改的部分如下:
import sys# 要处理的任务message = ' '.join(sys.argv[1:]) or "Hello World!"channel.basic_publish(exchange='', routing_key='hello', body=message) # 改为任务
修改 receive.py 中的代码,保存为 worker.py:
import timedef callback(ch, method, properties, body): print(" [x] 接收到 %r" % body.decode()) # 消息体中有几个“.”就睡几秒钟,假装任务处理很耗时 time.sleep(body.count(b'.')) print(" [x] 完成")
3.3.1 循环调度使用任务队列的优点之一是能够轻松地扩大规模。如果我们的任务被大量积压,来不及处理,我们可以很轻松地增加更多的任务处理进程(以下都将简称为 worker)。
启动两个终端,运行2个 worker.py:
python worker.py# [*] 等待消息、按下 CTRL+C 退出
再启动一个终端,运行 new_task.py:
python new_task.py First message.python new_task.py Second message..python new_task.py Third message...python new_task.py Fourth message....python new_task.py Fifth message.....
看看 worker 收到了什么:
# 第一个 worker: [x] 接收到 'First message.' [x] 完成 [x] 接收到 'Third message...' [x] 完成 [x] 接收到 'Fifth message.....' [x] 完成 # 第二个 worker: [x] 接收到 'Second message..' [x] 完成 [x] 接收到 'Fourth message....' [x] 完成
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个 worker。一般来说,每个 worker 都会收到相同数量的任务。这种分发任务的方式称为轮询。
3.3.2 消息确认在上面的代码中,如果一个 worker 在执行任务的过程中被终止了,那么任务会丢失。为了避免任务丢失,RabbitMQ 提供了消息确认机制:
如果 worker 回复一个 ack(acknowledgement),那么 RabbitMQ 就认为该任务被成功处理,然后从任务队列删除该任务。如果一个 worker 没有回复 ack 就终止运行了,那么 RabbitMQ 就认为该任务执行失败,会将该任务重新排队,分配给其他运行中的 worker。
消息确认有一个超时时间,默认为 30分钟。它能帮助我们检测异常的 worker。
默认情况下,手动消息确认是打开的。在前面的例子中,我们通过auto_ack=True关闭了它们。
继续修改代码,移除关闭消息确认的参数:
def callback(ch, method, properties, body): print(" [x] 接收到 %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] 完成") ch.basic_ack(delivery_tag = method.delivery_tag) # 发送 ackchannel.basic_consume(queue='hello', on_message_callback=callback)
使用这段代码后,即使你用 CTRL+C 终止一个正在处理任务的 worker,也不会有任何损失。
3.3.3 消息持久化我们已经学会了如何确保即使 worker 意外终止,任务也不会丢失。但是如果RabbitMQ 服务器停止,我们的任务仍然会丢失。要确保任务不会丢失,需要做两件事:将队列和任务都标记为持久。
标记队列为持久,以便 RabbitMQ 重启后队列能够存活:
# 因为 hello 队列已经存在,所以我们换了个新队列 task_queuechannel.queue_declare(queue='task_queue', durable=True)
注意!持久化标记要在生产者和消费者上都标记上。
标记任务为持久:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE ))
3.3.4 公平调度将消息标记为持久化并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 接收到消息并且还没有保存消息时,仍然会有一个很短的空档。
此外,RabbitMQ 不会对每条消息做fsync(2)处理——它可能只是被保存在缓存中,而不是真正写入磁盘。
持久性保证不是强的,但对于我们的简单任务队列来说已经足够了。如果你需要一个更强的保证,那么你可以使用 publisher /confirm/i。
RabbitMQ 默认均匀地分配任务,即使有的任务特别耗时,使得分配到该任务地 worker 任务已经堆积如山,RabbitMQ 还是在均匀分配。
这是因为 RabbitMQ 在任务进入队列时只进行分配,不查看 worker 未确认消息的数量,盲目地将第 n 个任务发送给第 n 个 worker。不过,我们可以通过设置,改变这种行为。
在 worker 中配置公平调度:
channel.basic_qos(prefetch_count=1)
上面地数字是1,也就是说在 worker 处理并确认前一个任务完成之前,不要向它分配新任务,而是将任务发送给下一个没有工作中的 worker。
3.4 发布/订阅(Publish/Subscribe)如果所有的 worker 都很忙,任务队列就会排满。你需要注意这一点,并尽可能添加更多的工作人员,或者使用消息 TTL。
发布者(即生产者)发送一条消息,每一个订阅者(即消费者)都能接收到,这种模式称为“发布/订阅”。
3.4.1 交换器(Exchanges)之前我们说:生产者发送的消息被保存到队列中。这只是为了好理解而这么说的。实际上,生产者从不直接向队列发送任何消息,而是发送给交换器(exchange)。由交换器决定消息下一步去往何处,具体的规则由交换器类型定义。
交换类型有 direct、topic、header 和 fanout。创建一个 fanout 类型的交换器,取名为logs:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout 交换器十分简单,它只是单纯地把消息发送给它所知道的队列。
将消息发送到创建好的交换器:
channel.basic_publish(exchange='logs', # 通过名字指定交换器 routing_key='', body=message)
3.4.2 临时队列之前我们使用的都是命名的队列,像hello、task_queue。如果我们创建队列时不传入名称会怎样呢?答案是 RabbitMQ 会创建一个临时队列,该队列的名称会像这样:“mq.gen-JzTY20BRgKO-HjmUJj0wLg”。
我们还可以传入一个exclusive=True实现独占效果:当消费者关闭连接后,该队列也会随之消失。
创建一个独占的临时队列:
result = channel.queue_declare(queue='', exclusive=True)
3.4.3 绑定在接收者中绑定交换器和队列,也就是订阅:
channel.queue_bind(exchange='logs', queue=result.method.queue)
之后,logs 交换器将向我们的临时队列添加消息。只要接收者都绑定同一个交换器和队列,就都能收到消息。
我们还可以通过以下命令查看绑定信息(前提是接收者运行中):
sudo rabbitmqctl list_bindings
3.5 路由(Routing)目前,我们的发布/订阅模型中,订阅者会毫无选择地接收发布者的所有消息。不过我们可以在绑定时,通过routing_key参数进行过滤,让订阅者选择想要的消息接收。
routing_key的含义取决于交换器的类型,之前使用的 fanout 类型会直接忽视这个参数。所以,需要使用其他类型的交换器。比如,Direct 交换器。
3.5.1 direct 交换器它背后的算法很简单:
先看basic_publish()的routing_key是否和queue_bind()的routing_key匹配;匹配就放入队列,不匹配就去匹配其他队列;没有匹配的就丢弃消息。# 发布消息channel.basic_publish(exchange='direct_logs', routing_key='black', body=message)# 绑定channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
RabbitMQ 允许使用同一个routing_key绑定多个队列。交换器会将消息发送给多个匹配的队列。
3.5.2 topic 交换器topic 交换器的routing_key必须是用.分割的多个词语,如orange.rabbit.lazy,最长为255个字节。发布时的routing_key为可以按照.分割。如果二者之间匹配,则将消息发送给队列,如果不匹配则丢弃。这和 direct 交换器是一样的。但topic 交换器真正的用处在于两个特殊符号:
*:代表匹配任意一个单词。比如,*.orange.*可以匹配到中间是orange的单词数量为3个的任意routing_key。#:代表匹配任意多个单词。比如,rabbit.#可以匹配到rabbit开头的任意routing_key,且单词数量在合法范围内没有限制。