欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RocketMQ不丢失消息的方式

时间:2023-07-26
文章目录

1.RocketMq架构2.消息不丢失

2.1 同步发送2.2 异步消息2.3 刷盘机制2.4 Broker 多副本和高可用2.5 消息确认2.6 Consumer 重试2.7 事务消息2.8 消息索引2.9 极端 1.RocketMq架构


Producer,Consumer,Brocker,Name Server

2.消息不丢失

1.Producer发送消息
2.Brocker保存消息
3.Consumer 消费消息
4.Brocker主从切换

2.1 同步发送

public void send() throws Exception { String message = "test producer"; Message sendMessage = new Message("topic1", "tag1", message.getBytes()); sendMessage.putUserProperty("name1","value1"); SendResult sendResult = null; DefaultMQProducer producer = new DefaultMQProducer("testGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); try { sendResult = producer.send(sendMessage); } catch (Exception e) { e.printStackTrace(); } if (sendResult != null) { System.out.println(sendResult.getSendStatus()); }}

同步发送会返回状态码

1.SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功。

2.FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。

3.FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。

4.SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。

根据状态码可以重复消息,重试的数量为3

2.2 异步消息

public void sendAsync() throws Exception { String message = "test producer"; Message sendMessage = new Message("topic1", "tag1", message.getBytes()); sendMessage.putUserProperty("name1","value1"); DefaultMQProducer producer = new DefaultMQProducer("testGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); producer.send(sendMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable e) { // TODO 可以在这里加入重试逻辑 } });}

异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。

2.3 刷盘机制 异步刷盘:默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。同步刷盘:消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置 2.4 Broker 多副本和高可用

Broker 为了保证高可用,采用一主多从的方式部署。


消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

同步复制:

slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。 2.5 消息确认

public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("topic1", "tag1"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try{ System.out.printf("Receive New Messages: %s", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start();}

如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。

2.6 Consumer 重试 返回 RECONSUME_LATER返回 null抛出异常

Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。

Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。Consumer 端一定要做好幂等处理。

Consumer 给Brocker 结束重试, 这里是count = 3 的时候结束重试

int count = ((MessageExt) msgs).getReconsumeTimes();if (count > 2) { //TODO 把消息写入本地存储 System.out.println("重试次数超过3次"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}

2.7 事务消息

RocketMq支持事务

Producer 发送half消息Brocker先把消息写入topic, RMQ_SYS_TRANS_HALF_TOPIC的队列,然后返回half消息给producer成功Producer 执行本地事务,成功后给 Broker 发送 commit 命令, 或者rollbackBroker 收到 commit 请求后把消息状态更改为成功并把消息推到真正的 topic;Consumer拉取消息进行消费

public class ProducerTransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return LocalTransactionState.UNKNOW; }}

2.8 消息索引

RocketMQ 核心的数据文件有 3 个:CommitLog、ConsumeQueue 和 Index, Index是一个索引文件


查找消息时,首先根据消息 key 的 hashcode 计算出 Hash 槽的位置,然后读取 Hash 槽的值计算 Index 条目的位置,从Index 条目位置读取到消息在 CommitLog 文件中的 offset,从而查找到消息。

Producer 发送消息时,可以指定一个 key

Message sendMessage = new Message("topic1", "tag1", message.getBytes());sendMessage.setKeys("weiyiid");

这样可以通过 RocketMQ 提供的命令或者管理控制台来查询消息是否发送成功。

2.9 极端

极端情况比如Rocketmq集群挂了, Producer发送消息一定失败, 可以在Producer做降级, 把发送的消息先存储在磁盘或者数据库中, 然后等到Rocketmq集群恢复了再推送消息

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。