欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RoctetMQ如何保证顺序消费

时间:2023-06-20
文章目录

1:背景2:发送端3:消费端 1:背景

在一些特殊的场景下,需要保证消息的顺序性,rocketMQ 提供了确实有效的解决方法。需要发送端和消费端同时保证才行。
总结下来,需要满足下面几个条件:
1:同一个Topic
2:同一个Queue
3:发消息的时候用同一个线程发送消息
3:消费消息的时候用同一个线程消费一个 Queue 里面的消息,或者用提供的消息选择器 MessageListenerOrderly

2:发送端

示例代码:

private static void orderSend(DefaultMQProducer producer) throws Exception { Message message = new Message("myTopic", "我发送的第一条消息".getBytes()); // 消息 queue 选择器,向 topic 中的那个 queue 去写消息,返回选择好的 queue MessageQueueSelector selector = new MessageQueueSelector() { @Override // List mqs : topic 下的 queue 列表 // Message msg : 具体要发的那条消息 // Object arg :外面的 arg 参数,会被传递进来,自定义参数 public MessageQueue select(List mqs, Message msg, Object arg) { //假设选择第一个 queue return mqs.get((Integer) arg); } }; producer.send(message, selector, 1, 2000); // 或者使用提供的默认选择器实现 //producer.send(message,new SelectMessageQueueByHash(),1); //producer.send(message,new SelectMessageQueueByRandom(),1); // 根据机房去路由选择:未实现,需要根据源码自己去实现 //producer.send(message,new SelectMessageQueueByMachineRoom(),1); }

下面解释为什么要同一个 Topic,同一个 Queue,发消息的时候用同一个线程发送消息
原因:
rocketMQ 的物理结构上是没有 Topic 这个概念的,是以 Queue 为物理单位的,在逻辑上,一个 Topic 包含 4 个 Queue,Queue 队列大家都知道,FIFO,先进先出的队列。 因此在 同一个 Topic 下的 同一个 Queue 里是可以保证 消息发送的顺序性的。至于为什么用同一个线程来发送消息,是为了防止多线程环境下带来的不确定性。

3:消费端

示例代码:

private static void oneCost(DefaultMQPushConsumer consumer){ // 设置开启消费线程数,最大和最小 //consumer.setConsumeThreadMax(); //consumer.setConsumeThreadMin(); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { // 消费消息 consumerMessage(msgs); return ConsumeOrderlyStatus.SUCCESS; } }); }

MessageListenerConcurrently 是并发的开启多个线程为同一个 Queue 消费消息,MessageListenerOrderly 只为每一个 Queue 开启一个线程。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。