为什么要有MQ?
流量削峰、应用解耦、异步处理
四大概念:
生产者、消费者、交换机、队列
六大模式(核心部分):
1.简单模式
2.工作模式
3.发布/订阅模式
4.路由模式
5.主题模式
6.发布确认模式
消息应答主要发生在消费者的工作线程这端。
消息应答机制(保证消息不丢失):消费者接到消息并处理后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。
消息发送后,立即被认为已经传送成功!一旦消费者出现连接或channel关闭,就会导致消息丢失!并且该模式没有对传递的消息数量进行限制。仅适用于消费者可以以某种速率高效处理消息的情况下使用。
手动应答//用于肯定确认,rabbitmq已经知道该消息被成功处理,可以丢弃channel.basicAck();//用于否定确认channel.basicNack(); //不处理该消息了,可以直接将其丢弃channel.basicReject();
手动应答的好处是可以批量应答,减少网络拥堵。
//true代表批量应答channel.basicAck(deliveryTag, true) ;
消息自动重新入队 消费者由于某些原因失去连接,导致消息未发送ack确认,rabbitmq会将其重新排队。重新分给另外一个消费者,确保不会丢失任何消息!
创建临时队列:
channel.queueDeclare().getQueue();
2.RabbitMQ持久化 如何保证rabbitmq服务停后,生产者发送的消息不丢失呢?
那么我们需要将队列和消息都标记为持久化,并进行发布确认!!!持久化到磁盘之后,重启服务,队列和消息就会仍然存在。
队列持久化:
//第二个参数为true表示队列标记为持久化//但是必须是要创建的新队列才能标记,否则报错channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
消息持久化:
//第三个参数设置为MessageProperties.PERSISTENT_TEXT_PLAIN//不能保证完全不会丢失消息channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
3.不公平分发轮询分发的策略下,有的消费者处理消息快,有的慢,就会导致处理消息快的消费者处于空闲状态。为了避免这种情况,可以设置
//默认是0,轮询分发//改为1,则消费快的消费者可以消费到更多消息channel.basicQos(1);
设置预取值可以指定给某个消费者消费的消息数量:
//设置预取值int preFetchCount = 3;channel.basicQos(preFetchCount);
4.发布确认 单个发布确认:属于同步发布确认的方式,但是发布速度特别慢!
批量发布确认:比单个发布确认要快,但是当出现故障时,不知道是哪个消息出现问题。
异步发布确认:性价比最高,无论是可靠性还是效率,利用函数回调达到消息的可靠性传递和投递成功!
RabbitMQ消息传递模型的核心思想:
生产者生产的消息不会直接发送到队列。
生产者只能将消息发送到交换机,由交换机推入队列。交换机的类型会决定把消息放到特定队列或者放到许多队列中或者丢弃。
Exchanges的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
1.如果routingKey相同,则就是fanout交换机。生产者发出消息,所有的消费者都能收到!
2.如果routingKey不同,则是direct交换机。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间)。
3.topic的routingKey不能随意写,必须是一个单词列表,以点号分隔开。星号可以代替一个单词,#可以代替一个或多个单词。
topic交换机包含fanout交换机和direct交换机。当一个队列的routingKey是#,代表这个队列接受所有的数据,就是fanout交换机。当一个队列的routingKey没有#和*,则该队列的绑定类型就是direct交换机。
由于某些特定原因导致queue中的消息无法被消费,消息没有后续的处理,就变成了死信。
应用场景1:为了保证订单业务的消息数据不丢失,需要用到RabbitMQ的死信队列机制。当消息消费异常时,将消息放到死信队列中,防止消息被丢失!
应用场景2:用户在商城下单成功,并点击去支付后,在指定时间内未支付自动失效。
死信的来源:
消息TTL(存活时间)过期队列达到最大长度(队列满了)消息被拒绝(basic.reject或basic.nack,一般情况下,消息没有被消费会被重新投递),并且requeue=false(设为false后,消息就不会被重新投递)
如果配置了死信队列消息,那么该消息会被丢到死信队列中。如果没有配置,该消息就会被丢弃!
7.延时队列 RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,第二个消息的延时时长很短,第二个消息并不会优先得到执行。
使用插件可以避免这种情况!
延时队列还有其他选择,Java的DelayQueue,利用redis的zset,利用Quartz或者利用kafka的时间轮。
8.幂等性 消息的重复消费。
rabbitmq消费端的幂等性保障:
1.唯一ID+指纹码机制
2.Redis原子性
利用redis执行setnx命令,天然具有幂等性,从而实现不重复消费。
要让队列实现优先级:
队列需要设置为优先级队列消息也需要设置为优先级消息消费者需要等消息全部已发送到队列中,再去消费,便于排序
优先级队列 0~255,越大越优先执行!
Map
当消费者由于各种各样的原因,比如消费者下线、宕机或由于维护而关闭,导致长时间内不能消费消息造成堆积。就有必要使用惰性队列,把消息保存在磁盘,而不是保存在内存。防止对内存造成较大压力。
队列具备两种模式,default和lazy。默认是default模式,可以用channel.queueDeclare()或Policy的方式设置,如果同时使用这两种,则Policy的方式拥有更高的优先级!
声明惰性队列:
Map
内存开销:
在发送100万条消息,每条消息占用1KB的情况下,普通队列占用内存是1.2GB,惰性队列占用1.5MB。
原因是惰性队列是把消息存在磁盘里,内存中存的是消息的索引!一旦需要消费消息时,会通过索引去磁盘上找消息。
优点:惰性队列占用内存相对来说会小很多。
缺点:惰性队列消费消息比较慢。