欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RocketMQ源码解析-Broker部分之Broker启动过程

时间:2023-07-23
目录

broker启动流程broker启动可配置参数启动入口`BrokerStartup`

1.创建brokerController2.`BrokerController`构造函数3.BrokerController初始化`initialize()`

3.1注册消息处理器`registerProcessor`3.2初始化事务消息相关的服务`initialTransaction()`3.3`initialize`总结 4.BrokerControler的`start` broker启动流程

借用一下【秃头爱健身】博主的的图,我觉得画的很好。

broker启动可配置参数

-n : 指定broker 的 namesrvAddr 地址; -h :打印命令; -c :指定配置文件的路径; -p :启动时候日志打印配置信息; -m :启动时候日志打印导入的配置信息。

启动入口BrokerStartup

broker启动的入口是在brokerStartup,方法是main

public static void main(String[] args) { //创建BrokerController start(createBrokerController(args)); }

1.创建brokerController

public static BrokerController createBrokerController(String[] args) { //设置RocketMq的版本号 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //设置broker的netty客户端的发送缓冲大小,默认是128 kb if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 131072; } //设置broker的netty客户端的接受缓冲大小,默认是128 kb if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 131072; } try { //PackageConflictDetect.detectFastjson(); //命令行选项解析 Options options = ServerUtil.buildCommandlineOptions(new Options()); //解析命令行为 ‘mqbroker’的参数 commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); //如果为空,直接退出 if (null == commandLine) { System.exit(-1); } //创建broker,netty的相关配置对象 final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); //是否使用TLS (TLS是SSL的升级版本,TLS是SSL的标准化后的产物,有1.0 1.1 1.2三个版本) nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); //设置netty的服务端监听的端口 10911,对外提供消息读写服务的端口 nettyServerConfig.setListenPort(10911); //创建消息存储配置 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); //如果broker是slave节点 if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { //比默认的40% 还要小 10 int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; //设置消息存储配置所能使用的最大内存比例,超过该内存,消息将被置换出内存, messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } //解析命令行参数'-c':指定broker的配置文件路径 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); properties2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //检查broker配置中的nameServer地址 String namesrvAddr = brokerConfig.getNamesrvAddr(); if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split(";"); for (String addr : addrArray) { RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( "The Name Server Address[%s] illegal, please set it as follows, "127.0.0.1:9876;192.168.0.1:9876"%n", namesrvAddr); System.exit(-3); } }//检查broker的角色 switch (messageStoreConfig.getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: //如果是master节点,则设置该节点brokerId=0 brokerConfig.setBrokerId(MixAll.MASTER_ID); break; case SLAVE: if (brokerConfig.getBrokerId() <= 0) { System.out.printf("Slave's brokerId must be > 0"); System.exit(-3); } break; default: break; } //是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true if (messageStoreConfig.isEnableDLegerCommitLog()) { brokerConfig.setBrokerId(-1); } //设置消息存储配置的高可用端口,10912 messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");//解析命令行参数'-p':启动时候日志打印配置信息 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig); MixAll.printObjectProperties(console, nettyServerConfig); MixAll.printObjectProperties(console, nettyClientConfig); MixAll.printObjectProperties(console, messageStoreConfig); System.exit(0); } //解析命令行参数'-m':启动时候日志打印导入的配置信息 else if (commandLine.hasOption('m')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig, true); MixAll.printObjectProperties(console, nettyServerConfig, true); MixAll.printObjectProperties(console, nettyClientConfig, true); MixAll.printObjectProperties(console, messageStoreConfig, true); System.exit(0); } log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); MixAll.printObjectProperties(log, brokerConfig); MixAll.printObjectProperties(log, nettyServerConfig); MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); //创建BrokerController final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // 记住所有的配置以防止丢弃 controller.getConfiguration().registerConfig(properties); //初始化BrokerController boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //注册关闭的钩子方法 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long beginTime = System.currentTimeMillis(); //BrokerController的销毁方法 controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }

broker启动会默认启动3个端口

端口说明10911接收消息推送的端口10912消息存储配置的高可用端口10909推送消息的VIP端口2.BrokerController构造函数

构造入参说明

参数类型说明brokerConfigBrokerConfig封装Broker的基本配置信息nettyServerConfigNettyServerConfig封装了broker作为对外提供消息读写操作的MQ服务器信息nettyClientConfigNettyClientConfig封装了broker作为NameServer的客户端的信息messageStoreConfigMessageStoreConfig封装消息存储Store的配置信息

public BrokerController( final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig ) { // BrokerStartup中准备的配置信息 this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; // Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件 this.consumerOffsetManager = new ConsumerOffsetManager(this); //消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.json this.topicConfigManager = new TopicConfigManager(this);//Consumer端使用pull的方式向Broker拉取消息请求的处理类 this.pullMessageProcessor = new PullMessageProcessor(this);//Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类 this.pullRequestHoldService = new PullRequestHoldService(this); //有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法 this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);//消费者id变化监听器 this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); //消费者管理类,并对消费者id变化进行监听 this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); //消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.json this.consumerFilterManager = new ConsumerFilterManager(this);//生产者管理 按照group进行分类 this.producerManager = new ProducerManager(); //客户端心跳连接处理类 this.clientHousekeepingService = new ClientHousekeepingService(this);//Console控制台获取Broker信息使用 this.broker2Client = new Broker2Client(this);//订阅关系管理类 this.subscriptionGroupManager = new SubscriptionGroupManager(this);//Broker对外访问的API this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);//FilterServer管理类 this.filterServerManager = new FilterServerManager(this);//Broker主从同步进度管理类 this.slaveSynchronize = new SlaveSynchronize(this);// 各种线程池的阻塞队列// 发送消息线程池队列 this.sendThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity()); this.pullThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity()); this.replyThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getReplyThreadPoolQueueCapacity()); this.queryThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.endTransactionThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); this.brokerFastFailure = new BrokerFastFailure(this); this.configuration = new Configuration( log, BrokerPathConfigHelper.getBrokerConfigPath(), this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); }

重点对一些核心类进行说明

参数说明ConsumerOffsetManagerConsumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件topicConfigManager消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.jsonpullMessageProcessorConsumer端使用pull的方式向Broker拉取消息请求的处理类pullRequestHoldServiceConsumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类messageArrivingListener有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法consumerIdsChangeListener消费者id变化监听器consumerManager消费者管理类,并对消费者id变化进行监听consumerFilterManager消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.jsonproducerManager生产者管理 按照group进行分类clientHousekeepingService客户端心跳连接处理类broker2ClientConsole控制台获取Broker信息使用subscriptionGroupManager订阅关系管理类brokerOuterAPIBroker对外访问的APIfilterServerManagerFilterServer管理类slaveSynchronizeBroker主从同步进度管理类3.BrokerController初始化initialize()

public boolean initialize() throws CloneNotSupportedException {//加载 topic 相关配置,文件地址为 {user.home}/store/config/topics.json boolean result = this.topicConfigManager.load(); //加载 不同的Consumer消费的进度情况 文件地址为 {user.home}/store/config/consumerOffset.json result = result && this.consumerOffsetManager.load(); //加载 订阅关系 文件地址 {user.home}/store/config/subscriptionGroup.json result = result && this.subscriptionGroupManager.load(); //加载 Consumer的过滤信息配置 文件地址 {user.home}/store/config/consumerFilter.json result = result && this.consumerFilterManager.load();//如果加载成功 if (result) { try { //创建消息存储类messageStore this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); //使用的是DLegerCommitLog,则创建DLedgerRoleChangeHandler if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } //broker消息统计类 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } }//加载消息的日志文件,包含CommitLog,ConsumeQueue等 result = result && this.messageStore.load();//如果加载成功 if (result) { //开启服务端 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); //设置10909的服务端口 fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); //开启10909的服务端口,这个端口只给生产者使用 this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); //处理消息生产者发送的生成消息请求相关的线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_"));//处理消费者发出的消费消息请求相关的线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); //处理回复消息api的线程池 this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getProcessReplyMessageThreadPoolNums(), this.brokerConfig.getProcessReplyMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.replyThreadPoolQueue, new ThreadFactoryImpl("ProcessReplyMessageThread_"));//查询线程 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); //省略一些线程池//为客户端注册需要处理API指令事件,以及消息发送和消费的回调方法 this.registerProcessor(); final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; //每天执行一次,统计昨天put的message和get的message this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS);// 默认5s执行一次,会把消费这的偏移量存到文件中 ${user.home}/store/config/consumerOffset.json.json this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// 默认10s执行一次,会把消费者的消息过滤的信息持久化到文件 ${user.home}/store/config/consumerFilter.json this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);// 每3分钟,当消费者消费太慢,会禁用到消费者组 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES);//打印当前的Send Queue Size,Pull Queue Size,Query Queue Size,Transaction Queue Size this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS);//每隔一分钟打印一次,dispath的消息偏移量和总的消息偏移量的差值 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); if (this.brokerConfig.getNamesrvAddr() != null) { //更新nameServer地址 this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { //没有明确指定name-server的地址,且配置了允许从地址服务器获取name-server地址 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 每隔2分钟从name-server地址服务器拉取最新的配置 // 这个是实现name-server动态增减的唯一方法 BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } } else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //定时打印master与slave的差距 BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } }//初始化事务消息相关的服务 initialTransaction(); //初始化权限管理器 initialAcl(); //初始化RPC调用的钩子 initialRpcHooks(); } return result; }

3.1注册消息处理器registerProcessor

rocketmq中有许多线程执行器,包括sendMessageExecutor(发送消息),pullMessageExecutor(拉取消息),queryMessageExecutor(查询消息),adminBrokerExecutor(默认处理)。这些线程执行器会通过registerProcessor注册到NettyRemotingServer ,每一个RequestCode会有一个对应的执行器,最终会以RequestCode为键放到一个HashMap中,当请求到达nettyServer时会根据RequestCode把请求分发到不同的执行器去处理请求

public void registerProcessor() { SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); //省略其他的处理器

最终放到processorTable的map中

@Override public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pair pair = new Pair(processor, executorThis); this.processorTable.put(requestCode, pair); }

相关的RequestCode说明一下:

事件名code说明SEND_MESSAGE10生产者发送信息SEND_MESSAGE_V2310生产者发送信息SEND_BATCH_MESSAGE320批量发送消息CONSUMER_SEND_MSG_BACK36消费端消费失败的时候返回的消息PULL_MESSAGE11消费者拉取消息SEND_REPLY_MESSAGE324消费者回包消息,可以用类似RPC调用3.2初始化事务消息相关的服务initialTransaction()

服务加载方式是Java的SPI方式。

private void initialTransaction() { //加载TransactionalMessageService服务,实现类为TransactionalMessageServiceImpl this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); if (null == this.transactionalMessageService) { this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore())); log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName()); } //AbstractTransactionalMessageCheckListener对应的服务类为LogTransactionalMessageCheckListener ,其中实现为空实现 this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class); if (null == this.transactionalMessageCheckListener) { this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener(); log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName()); } //设置对应的brokerController到AbstractTransactionalMessageCheckListener中 this.transactionalMessageCheckListener.setBrokerController(this); //创建TransactionalMessageCheckService,服务是周期检查事务的服务, this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); } }

3.3initialize总结

initialize方法相应的逻辑相对来说比较多,稍微总结为已下几步:

1.服务器内的相关日志文件的加载,{user.home}/store/config/ 文件目录下的json配置文件(包含topics,consumerOffset,subscriptionGroup,consumerFilter)。

2.如果上述文件加载成功,会启动对应的Broker客户端,然后创建一些线程池,在后面注册 API 指令事件后会监听到API的时候会进行处理
3.注册事件到对应的Broker客户端上,然后会记录对应的API事件和对应线程池封装到一个对象中
4.启动一些 定时任务,这些任务比如记录Broker状态,消费进度持久化等任务
5.初始化一些服务,比如事务相关(周期检查事务),消息权限校验初始化和Rpc调用钩子相关服务。对应的服务加载方式是Java的SPI方式。

4.BrokerControler的start

controller.start(); public void start() throws Exception { if (this.messageStore != null) { // 启动消息存储服务DefaultMessageStore,其会对/store/lock文件加锁, // 以确保在broker运行期间只有一个broker实例操作/store目录 this.messageStore.start(); } if (this.remotingServer != null) { // 启动Netty服务监听10911端口,对外提供服务(消息生产、消费) this.remotingServer.start(); } if (this.fastRemotingServer != null) { // 监听10909端口 this.fastRemotingServer.start(); } if (this.fileWatchService != null) { // fileWatchService与TLS有关,todo tls解析 this.fileWatchService.start(); } if (this.brokerOuterAPI != null) { // 启动Netty客户端netty,broker使用其向外发送数据,比如:向NameServer上报心跳、topic信息。 this.brokerOuterAPI.start(); } if (this.pullRequestHoldService != null) { // 长轮询机制hold住拉取消息请求的服务 this.pullRequestHoldService.start(); } if (this.clientHousekeepingService != null) { // 每10s检查一遍非活动的连接服务 this.clientHousekeepingService.start(); } if (this.filterServerManager != null) { this.filterServerManager.start(); } if (!messageStoreConfig.isEnableDLegerCommitLog()) { // 处理HA startProcessorByHa(messageStoreConfig.getBrokerRole()); // 启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等 handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); // 向所有的nameserver发送本机所有的主题数据; // 包括主题名、读队列个数、写队列个数、队列权限、是否有序等 this.registerBrokerAll(true, false, true); } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 定时向NameServer注册Broker,最小每10s。 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); if (this.brokerStatsManager != null) { // Broker信息统计,这个没有具体的实现;所以暂时不用管 this.brokerStatsManager.start(); } if (this.brokerFastFailure != null) { // Broker对请求队列中的请求进行快速失败,返回`Broker繁忙、请稍后重试`信息 this.brokerFastFailure.start(); }}

对这几个服务进行说明一下:

服务名类型说明messageStoreDefaultMessageStore处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等remotingServerRemotingServerBroker的服务端,处理消费者和生产者的请求fastRemotingServerRemotingServer只给消息生产者的服务端fileWatchServiceFileWatchService启动监控服务连接时用到的SSL连接文件的服务brokerOuterAPIBrokerOuterAPIRocketMQ控制台跟Broker交互时候的客户端pullRequestHoldServicePullRequestHoldService处理push模式消费,或者延迟消费的服务clientHousekeepingServiceClientHousekeepingService心跳连接用的服务filterServerManagerFilterServerManager消息过滤的服务transactionalMessageCheckServiceTransactionalMessageCheckService定期检查和处理事务消息的服务slaveSynchronizeSlaveSynchronize主从之间topic,消费偏移等信息同步用的

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。