1:背景2:发送端3:消费端 1:背景
在一些特殊的场景下,需要保证消息的顺序性,rocketMQ 提供了确实有效的解决方法。需要发送端和消费端同时保证才行。
总结下来,需要满足下面几个条件:
1:同一个Topic
2:同一个Queue
3:发消息的时候用同一个线程发送消息
3:消费消息的时候用同一个线程消费一个 Queue 里面的消息,或者用提供的消息选择器 MessageListenerOrderly
示例代码:
private static void orderSend(DefaultMQProducer producer) throws Exception { Message message = new Message("myTopic", "我发送的第一条消息".getBytes()); // 消息 queue 选择器,向 topic 中的那个 queue 去写消息,返回选择好的 queue MessageQueueSelector selector = new MessageQueueSelector() { @Override // List
下面解释为什么要同一个 Topic,同一个 Queue,发消息的时候用同一个线程发送消息
原因:
rocketMQ 的物理结构上是没有 Topic 这个概念的,是以 Queue 为物理单位的,在逻辑上,一个 Topic 包含 4 个 Queue,Queue 队列大家都知道,FIFO,先进先出的队列。 因此在 同一个 Topic 下的 同一个 Queue 里是可以保证 消息发送的顺序性的。至于为什么用同一个线程来发送消息,是为了防止多线程环境下带来的不确定性。
示例代码:
private static void oneCost(DefaultMQPushConsumer consumer){ // 设置开启消费线程数,最大和最小 //consumer.setConsumeThreadMax(); //consumer.setConsumeThreadMin(); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List
MessageListenerConcurrently 是并发的开启多个线程为同一个 Queue 消费消息,MessageListenerOrderly 只为每一个 Queue 开启一个线程。