目录
问题点broker接收流程图消息接收
1.启动入口`NettyRemotingServer.start()`2.创建NettyServer通信通道3.Netty接收核心处理类`NettyServerHandler`
3.1 请求处理`processRequestCommand`3.2 消息请求处理器`SendMessageProcessor`3.3 消息存储`DefaultMessageStore` 问题点
1.Producer发送消息之后Broker是如何接收?
2.Broker接收消息之后是如何处理的?
broker接收流程图 消息接收
我们在之前的RocketMQ源码解析-Broker部分之Broker启动过程文章中分析过,broker的启动会调用BrokerStartup.start(),另外rocketmq是采用netty进行底层通信,所以broker也是通过netty接收消息,并进行消息处理。
1.启动入口NettyRemotingServer.start()
其实NettyRemotingServer.start的启动是在BrokerStartup.start()进行调用的。
public void start() throws Exception { if (this.messageStore != null) { this.messageStore.start(); } if (this.remotingServer != null) { //NettyRemotingServer.start() this.remotingServer.start(); }}
2.创建NettyServer通信通道
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); prepareSharableHandlers(); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
prepareSharableHandlers
private void prepareSharableHandlers() { handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode); encoder = new NettyEncoder(); connectionManageHandler = new NettyConnectManageHandler(); serverHandler = new NettyServerHandler(); }
3.Netty接收核心处理类NettyServerHandler
NettyServerHandler底层实现了Netty框架的ChannelHandler,针对producer端发送过来的消息进行了拦截处理,想深入了解可以自学一下Netty框架,下面我们重点分析一下NettyServerHandler的处理流程。
@ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { //处理消息接收 processMessageReceived(ctx, msg); } } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { //处理请求的命令 case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
3.1 请求处理processRequestCommand
该方法主要是对brokerController.start()时通过registerProcessor注册的事件管理映射对象processTable对应事件的事件处理逻辑
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { //根据请求code获取事件处理器、线程执行器,这里主要获取SendMessageProcessor final Pair matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { //获取远程地址 String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); //事前钩子 doBeforeRpcHooks(remoteAddr, cmd); final RemotingResponseCallback callback = response -> { doAfterRpcHooks(remoteAddr, cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); //异步处理请求 processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); //同步处理请求,SendMessageProcessor RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); } } catch (Throwable e) { } } }
3.2 消息请求处理器SendMessageProcessor
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { RemotingCommand response = null; try { response = asyncProcessRequest(ctx, request).get(); } catch (InterruptedException | ExecutionException e) { log.error("process SendMessage error, request : " + request.toString(), e); } return response; } public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { // 消费者发送的重试消息 case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); // 生产者发送的普通消息 default: //根据请求组装RequestHeader对象 SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return CompletableFuture.completedFuture(null); } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { // 处理批量消息 return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { // 处理单个消息请求 return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } } }
我们重点分析一下单条消息的处理逻辑
private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) {//初始化响应数据 final RemotingCommand response = preSend(ctx, request, requestHeader); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); if (response.getCode() != -1) { return CompletableFuture.completedFuture(response); } //获取消息内容body final byte[] body = request.getBody(); //消息发送的队列id int queueIdInt = requestHeader.getQueueId(); //消息发送的topic信息 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); //如果队列id<0,随机一个queueId if (queueIdInt < 0) { queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); } //封装信息到MessageExtBrokerInner中 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); //处理重试和死信队列 if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return CompletableFuture.completedFuture(response); } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); MessageAccessor.setProperties(msgInner, origProps); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message. // It works for most case、In some cases msgInner.setPropertiesString invoked later and replace it. String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); } else { msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); } CompletableFuture putMessageResult = null; String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); //如果是事务消息走事务处理逻辑 if (transFlag != null && Boolean.parseBoolean(transFlag)) { //是否可以走事务流程 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return CompletableFuture.completedFuture(response); } putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { //普通消息则使用MessageStore将消息进行存储 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); }
3.3 消息存储DefaultMessageStore
这里我们简单了解一下消息最终会通过commitLog进行存在,后面将会对消息存储模块进行学习和讲解。
@Override public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); return putResultFuture; }
大致总结一下:
初始化响应数据;获取要发送的队列和topic配置,如果queueId<0,则随机选择一个;构建消息类;处理重试和死信,对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入死信队列;如果是事务消息,则需要校验是否不允许发送事务消息;使用MessageStore组件将消息存储在本地文件,只存储CommitLog文件,ConsumerQueue文件和IndexFile文件会由后台线程异步存储;处理消息存储结果