一 生产者属性二 默认topic作用
1 Broker初始化TBW1022 生产者获取默认topic信息 三 发送消息重试四 Broker接收消息分析
1 Broker启动注册Processor2 SendMessageProcessor
2.1 Broker创建topic2.2 消息存储 一 生产者属性
private String producerGroup; private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; private volatile int defaultTopicQueueNums = 4; private int sendMsgTimeout = 3000; private int compressMsgBodyOverHowmuch = 1024 * 4; private int retryTimesWhenSendFailed = 2; private int retryTimesWhenSendAsyncFailed = 2; private int maxMessageSize = 1024 * 1024 * 4;
二 默认topic作用1 Broker初始化TBW102private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = “TBW102”;
Broker启动会定时上报topic的信息到nameSrv。
this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap
Topic配置文件topics.json,默认topic为TBW102
{"dataVersion":{"counter":1,"timestamp":1643214418621},"topicConfigTable":{......"TBW102":{"order":false,"perm":7,"readQueueNums":8,"topicFilterType":"SINGLE_TAG","topicName":"TBW102","topicSysFlag":0,"writeQueueNums":8},......}}
2 生产者获取默认topic信息private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 第一次返回默认topic信息, 这里不返回下面的判断逻辑进不去 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { ...... // 真正发送消息的方法 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ...... } } private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
三 发送消息重试private int retryTimesWhenSendFailed = 2;
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //...... int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 发送消息逻辑,Netty RemotingClient sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); } // ....... } } }
四 Broker接收消息分析 1 Broker启动注册Processorpublic class BrokerStartup { // ...... public static void main(String[] args) { start(createBrokerController(args)); } public static BrokerController createBrokerController(String[] args) { // ...... boolean initResult = controller.initialize(); // ...... } ......}public class BrokerController { public boolean initialize() throws CloneNotSupportedException { // ...... this.registerProcessor(); // ...... } public void registerProcessor() { SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this); replyMessageProcessor.registerSendMessageHook(sendMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); ClientManageProcessor clientProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); } // ......}
2 SendMessageProcessor 2.1 Broker创建topicpublic class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { public CompletableFuture
所有的消息存储在store/commitLogstore/consumerQueue下面是topic目录,topic目录下面是队列id标号目录,队列id编号下面是索引文件,用来记录消息commitLogOffset、msgSize、tagsCode。store/index文件记录了其它的一些索引。commitLog,consumerQueue可以指定大小。
public CompletableFuture