文章目录
序言:PullCallback消费原理图ConsumeMessageOrderlyService一顺序消费
消费服务构造函数提交消费请求消费执行 ConsumeMessageConcurrentlyService一并发消费
消费服务构造函数提交消费请求消费执行 序言:PullCallback
broker 响应mqclient,执行pullCallback回调触发消息存储消费缓冲区processQueue触发消费线程消费消息
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { .....、删除其他代码 统计rt信息 long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { 如果消息体数量为0 则重新拉消息 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); 增加统计信息 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); 核心1: 将拉取到的32条消息添加到processQueue的msgTreeMap boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); 核心2: 将拉取到的32条消息构建consumerequest 给消费线程消费 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); .....、交给PullRequestService重新拉取 } .....、删除其他代码 }};
消费原理图 ConsumeMessageOrderlyService一顺序消费 消费服务构造函数
虽然是Order顺序消费,但依旧是多线程通过锁保障顺序性
class ConsumeRequest implements Runnable { public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new linkedBlockingQueue(); this.consumeExecutor = new ThreadPoolExecutor( // 默认20线程 this.defaultMQPushConsumer.getConsumeThreadMin(), // 默认20线程 this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, // 无界阻塞队列 this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_") // 默认异常策略 ); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));}
提交消费请求
不同于并发消费,顺序消费的消息不是显示入参基于processQueue进行消息消费ConsumeRequest若存在则无需多次构建
public void submitConsumeRequest( final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { 只有加入processQueue内含有消息 并且没有已经在处理的ConsumeRequest 则构建新的ConsumeRequest提交 if (dispathToConsume) { consumeExecutor消费直接基于processQueue消费消息 假设已经有老的ConsumeRequest在处理,则无需构建新的ConsumeRequest 因为新老processQueue相同,消息集合都位于processQueue,一个ConsumeRequest即可 ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); }}
消费执行
ConsumeMessageOrderlyService.ConsumeRequest
consumeExecutor线程池执行ConsumeRequest.run方法
虽然多线程,但依据submitConsumeRequest,一般也就单线程执行
顺序性基于锁而非线程数保障
基于processQueue进行顺序消费获取分页大小根据分页大小获取offset有序的消息集合前置钩子处理基于消息队列messagequeue一processqueue处理队列维度加锁消费后置钩子处理处理重试逻辑[阻塞]
class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; private final MessageQueue messageQueue; .....、删除其他代码 @Override public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped、{}", this.messageQueue); return; } 获取当前 messageQueue 的锁 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || ( this.processQueue.isLocked() && !this.processQueue.isLockExpired() ) ) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { .....、删除其他代码 获取分页大小 final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); 一般默认一条 processQueue消息存储格式如下 TreeMap msgTreeMap 根据message_offset消息偏移量构建有序map,所以t获取的消息保障了偏移量从小到大有序性 List msgs = this.processQueue.takeMessags(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; .....、删除代码: 执行before hook long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { processQueue维度加锁 this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped、{}", this.messageQueue); break; } 消费 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } .....、删除其他代码 long consumeRT = System.currentTimeMillis() - beginTimestamp; .....、删除其他代码以及后置钩子执行 rt时间统计 ConsumeMessageOrderlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); 处理重试逻辑[阻塞] continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped、{}", this.messageQueue); return; } 稍后重消费 ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }}
ConsumeMessageConcurrentlyService一并发消费 消费服务构造函数
consumeExecutor处理ConsumeRequest任务(Runnable)进行消息消息当拒绝异常,会通过submitConsumeRequestLater稍后提交消费线程池
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new linkedBlockingQueue(); 消费线程池参数 this.consumeExecutor = new ThreadPoolExecutor( 默认20 this.defaultMQPushConsumer.getConsumeThreadMin(), 默认20 this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, 默认无界阻塞队列 this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_") 默认异常策略 ); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); }
提交消费请求
交由consumeExecutor线程池进行消费基于拉取的msgs直接分页后提交消费线程池
public void submitConsumeRequest( final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { // 根据配置的一次最多消费多少消费拉起下来的本地消息 一般默认就是一个 final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { // 默认拉取32条但是一次只消费1条 for (int total = 0; total < msgs.size(); ) { List msgThis = new ArrayList(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } }}
消费执行
ConsumeMessageConcurrentlyService.ConsumeRequest
consumeExecutor线程池执行ConsumeRequest.run方法
直接对ConsumeRequest.msgs进行消费,无需处理顺序性再平衡或者其他相关策略,无需消费获取业务执行listener执行Before Hook执行消费逻辑 [一个分页合集,默认1条]执行After Hook根据ackIndex 处理重试逻辑
class ConsumeRequest implements Runnable { private final List msgs; // 拉取消息同时存储在processQueue.msgTreeMap private final ProcessQueue processQueue; // ProcessQueue拉取的消息对应的消息队列 private final MessageQueue messageQueue; public ConsumeRequest(List msgs, ProcessQueue processQueue, MessageQueue messageQueue) { this.msgs = msgs; this.processQueue = processQueue; this.messageQueue = messageQueue; } public List getMsgs() { return msgs; } public ProcessQueue getProcessQueue() { return processQueue; } @Override public void run() { // 再平衡或者其他相关策略,无需消费 if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped、group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } // 获取业务执行listener MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; // 执行Before Hook if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // 消费前先执行钩子 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } // 执行消费逻辑 一个分页合集,默认1条 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); // 执行后置钩子 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); // 根据ackIndex 处理重试逻辑【retry_consume_topic】 if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result、messageQueue={}, msgs={}", messageQueue, msgs); } } public MessageQueue getMessageQueue() { return messageQueue; }}