原理图
第一部分一构建pullrequest第二部分一pullrequest拉取
消息拉取核心流程图消息拉取核心源码 总结 原理图
由再平衡线程构建pullrequest加入拉取线程的pullRequestQueue拉取线程不断获取pullRequest后调用对应的MQConsumerInner进行消息拉取拉取结果加入ProcessQueue, 并循环拉取
第一部分一构建pullrequest
Rebalance等组件构建并调用executePullRequestImmediately将pullRequest加入pullRequestQueue
public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); }}
第二部分一pullrequest拉取自旋拉取消息指定消费者组对应的MQConsumerInner进行消息拉取
public void run() {.....、删除其他代码 while (!this.isStopped()) { PullRequest pullRequest = this.pullRequestQueue.take(); 拉取消息 this.pullMessage(pullRequest); }}private void pullMessage(final PullRequest pullRequest) {.....、删除其他代码 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; 消费端拉取消息 交给拉取消息的核心实现者PullApiWrapper impl.pullMessage(pullRequest);}
消息拉取核心流程图 消息拉取核心源码获取ProcessQueue校验延迟拉取,包括流控,偏移量控制等构建回调函数客户端直接对tag名称过滤消息存在则处理消息消费,消息内存存储processqueue 再次拉取消息不存在或无新消息继续拉取消息违法则不在拉取设置长轮询相关参数执行消息异步拉取并上报消费进度
public void pullMessage(final PullRequest pullRequest) { 对应 RebalanceImpl.processQueueTable 的ProcessQueue 和messageQueue一一对应 final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return; } if (this.isPause()) { log.warn("consumer was paused, execute pull request later、instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } start 对消息队列中的数量和大小判断 进行流控 阈值1000条或者100M long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } end 对消息队列中的数量和大小判断 进行流控 阈值1000条或者100M 一些校验 if (!this.consumeOrderly) { 非顺序消费校验 判断processQueue的offset偏移量 偏移量过大则一会在消费 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } } else { 顺序消费校验 if (processQueue.isLocked()) { 不是第一次获取锁 则需要设置消费位点 if (!pullRequest.isLockedFirst()) { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); 出现故障 消费位点超前 注意此外master不可用等也会造成brokerBusy 或者pageCache 竞争激烈等 boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker、pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset、pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { 没有获取本地锁 则一会在消费 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; } } 根据主题获取订阅逻辑信息 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { 如果没有订阅信息 3秒后在执行拉取 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; } final long beginTimestamp = System.currentTimeMillis(); 拉取消息回调函数 PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { 服务端taghashcode过滤 客户端直接对tag名称过滤 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: // 统计rt信息 long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { // 如果消息体数量为0 则重新拉消息 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); // 增加统计信息 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); 核心1: 将拉取到的32条消息添加到processQueue的msgTreeMap boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); 核心2: 将拉取到的32条消息构建consumerequest 给消费线程消费 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); 拉取间隔 默认0 重新拉取 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: 继续拉取 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: 继续拉取 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { 如果offset不合法 则重置pullRequest的消费位点 DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); 同时移除掉对其的消费 等待后续拉取nameServer相关信息触发变更再触发再平衡后消费 DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }; 集群模式需要持久化到broker端当前的commitOffset boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } String subexpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subexpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } 将长轮询等机制 int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subexpression != null, // subscription classFilter // class filter ); try { 真正的拉取消息 异步拉取 除去定时持久化 这里也会告诉broker持久化 this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(),// 消息消费队列 subexpression, // 消息订阅子模式subscribe( topicName, "模式") subscriptionData.getexpressionType(), subscriptionData.getSubVersion(),// 版本 // consume queue的offset pullRequest.getNextOffset(), 一次拉取多大的消息 this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, // 需要持久化:当前已经到达的消费进度 【注意 由于消息是msgMaptree 是有序的 如果提亲消费了更大的offset消息 这里返回的依旧是最小的offset】 commitOffsetValue, // 长轮询的broker超时时间 15秒 BROKER_SUSPEND_MAX_TIME_MILLIS, // 长轮询消费端超时 30s 也就是15秒后服务端会返回 但是客户端的超时是30秒 注意如果第二次请求这里的时间为30-第一次消耗时间 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 异步调用 CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); }}
总结消息拉取客户端启动了长轮询长轮询的broker超时时间 15秒长轮询消费端超时 30s 也就是15秒后服务端会返回 但是客户端的超时是30秒 注意如果第二次请求这里的时间为30-第一次消耗时间同时持久化消息进度异步拉取拉取完成会将消息加入processqueue同时出发消费消息线程执行一次拉取默认32条sysFlag包含Filter,COMMIT_OFFSET,SUSPEND等机制回调函数对tag进行字符串匹配过滤[服务端只进行tag hash过滤]