1.RocketMq架构2.消息不丢失
2.1 同步发送2.2 异步消息2.3 刷盘机制2.4 Broker 多副本和高可用2.5 消息确认2.6 Consumer 重试2.7 事务消息2.8 消息索引2.9 极端 1.RocketMq架构
Producer,Consumer,Brocker,Name Server
1.Producer发送消息
2.Brocker保存消息
3.Consumer 消费消息
4.Brocker主从切换
public void send() throws Exception { String message = "test producer"; Message sendMessage = new Message("topic1", "tag1", message.getBytes()); sendMessage.putUserProperty("name1","value1"); SendResult sendResult = null; DefaultMQProducer producer = new DefaultMQProducer("testGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); try { sendResult = producer.send(sendMessage); } catch (Exception e) { e.printStackTrace(); } if (sendResult != null) { System.out.println(sendResult.getSendStatus()); }}
同步发送会返回状态码
1.SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功。
2.FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。
3.FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。
4.SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。
根据状态码可以重复消息,重试的数量为3
2.2 异步消息public void sendAsync() throws Exception { String message = "test producer"; Message sendMessage = new Message("topic1", "tag1", message.getBytes()); sendMessage.putUserProperty("name1","value1"); DefaultMQProducer producer = new DefaultMQProducer("testGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); producer.send(sendMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable e) { // TODO 可以在这里加入重试逻辑 } });}
异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。
2.3 刷盘机制 异步刷盘:默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。同步刷盘:消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置 2.4 Broker 多副本和高可用Broker 为了保证高可用,采用一主多从的方式部署。
消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。
这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:
brokerRole=SYNC_MASTER
同步复制:
slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。 2.5 消息确认public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("topic1", "tag1"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try{ System.out.printf("Receive New Messages: %s", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start();}
如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。
2.6 Consumer 重试 返回 RECONSUME_LATER返回 null抛出异常Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。
Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。Consumer 端一定要做好幂等处理。Consumer 给Brocker 结束重试, 这里是count = 3 的时候结束重试
int count = ((MessageExt) msgs).getReconsumeTimes();if (count > 2) { //TODO 把消息写入本地存储 System.out.println("重试次数超过3次"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
2.7 事务消息RocketMq支持事务
Producer 发送half消息Brocker先把消息写入topic, RMQ_SYS_TRANS_HALF_TOPIC的队列,然后返回half消息给producer成功Producer 执行本地事务,成功后给 Broker 发送 commit 命令, 或者rollbackBroker 收到 commit 请求后把消息状态更改为成功并把消息推到真正的 topic;Consumer拉取消息进行消费public class ProducerTransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return LocalTransactionState.UNKNOW; }}
2.8 消息索引RocketMQ 核心的数据文件有 3 个:CommitLog、ConsumeQueue 和 Index, Index是一个索引文件
查找消息时,首先根据消息 key 的 hashcode 计算出 Hash 槽的位置,然后读取 Hash 槽的值计算 Index 条目的位置,从Index 条目位置读取到消息在 CommitLog 文件中的 offset,从而查找到消息。
Producer 发送消息时,可以指定一个 key
Message sendMessage = new Message("topic1", "tag1", message.getBytes());sendMessage.setKeys("weiyiid");
这样可以通过 RocketMQ 提供的命令或者管理控制台来查询消息是否发送成功。
2.9 极端极端情况比如Rocketmq集群挂了, Producer发送消息一定失败, 可以在Producer做降级, 把发送的消息先存储在磁盘或者数据库中, 然后等到Rocketmq集群恢复了再推送消息