Tomcat 请求处理流程



Tomcat 请求处理流程 请求流程

设计了这么多层次的容器,Tomcat是怎么确定每一个请求应该由哪个Wrapper容器里的 Servlet来处理的呢?



Mapper组件里保存了Web应用的配置信息,其实就是容器组件与访问路径的映射关系, 比如Host容器里配置的域名、Context容器里的Web应用路径,以及Wrapper容器里 Servlet映射的路径,你可以想象这些配置信息就是一个多层次的Map。

当一个请求到来时,Mapper组件通过解析请求URL里的域名和路径,再到自己保存的 Map里去查找,就能定位到一个Servlet。请你注意,一个请求URL最后只会定位到一个 Wrapper容器,也就是一个Servlet。

下面的示意图中 , 就描述了 当用户请求链接 http://www.itcast.cn/bbs/findAll 之 后, 是如何找到最终处理业务逻辑的servlet 。

那上面这幅图只是描述了根据请求的URL如何查找到需要执行的Servlet , 那么下面我们 再来解析一下 , 从Tomcat的设计架构层面来分析Tomcat的请求处理






CoyoteAdaptor组件负责将Connector组件和Engine容器关联起来,把生成的 Request对象和响应对象Response传递到Engine容器中,调用 Pipeline。

Engine容器的管道开始处理,管道中包含若干个Valve、每个Valve负责部分处理逻 辑。执行完Valve后会执行基础的 Valve–StandardEnginevalve,负责调用Host容器的 Pipeline。

Host容器的管道开始处理,流程类似,最后执行 Context容器的Pipeline。

Context容器的管道开始处理,流程类似,最后执行 Wrapper容器的Pipeline。

Wrapper容器的管道开始处理,流程类似,最后执行 Wrapper容器对应的Servlet对象 的 处理方法。


在前面所讲解的Tomcat的整体架构中,我们发现Tomcat中的各个组件各司其职,组件 之间松耦合,确保了整体架构的可伸缩性和可拓展性,那么在组件内部,如何增强组件 的灵活性和拓展性呢? 在Tomcat中,每个Container组件采用责任链模式来完成具体的 请求处理。

在Tomcat中定义了Pipeline 和 Valve 两个接口,Pipeline 用于构建责任链, 后者代表责 任链上的每个处理器。Pipeline 中维护了一个基础的Valve,它始终位于Pipeline的末端 (最后执行),封装了具体的请求处理和输出响应的过程。当然,我们也可以调用 addValve()方法, 为Pipeline 添加其他的Valve, 后添加的Valve 位于基础的Valve之 前,并按照添加顺序执行。Pipiline通过获得首个Valve来启动整合链条的执行 。





第一部分: 请求由Endpoint捕获,并转交给Processor处理.




protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; while (running) { while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; try { //接受客户端请求 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }



public void run() { while (true) { ....... while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); //开始正式执行请求流程 processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }

2.1 Poller.processKey()


protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { //开始处理会话 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } }

2.2 AbstractEndpoint.processSocket()



public boolean processSocket(SocketWrapperbase socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } //获取socket的处理器 SocketProcessorbase sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } //获取到线程池 Executor executor = getExecutor(); if (dispatch && executor != null) { //由线程池调用一个线程来执行Socket处理器 executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }



protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); .... if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket if (event == null) { //获取Handler处理器:Servlet(处理器),调度处理器的处理方法 state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { close(socket, key); } } .... } }



@Override public SocketState process(SocketWrapperbase wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // Nothing to do、Socket has been closed. return SocketState.CLOSED; } //获取Socket S socket = wrapper.getSocket(); //交给Processor处理 -> 真正开始处理请求 Processor processor = connections.get(socket); .... do { //开始解析请求 state = processor.process(wrapper, status); ..... } } while ( state == SocketState.UPGRADING);}





public SocketState process(SocketWrapperbase<?> socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); if (getLog().isDebugEnabled()) { getLog().debug("Processing dispatch type: [" + nextDispatch + "]"); } state = dispatch(nextDispatch.getSocketStatus()); if (!dispatches.hasNext()) { state = checkForPipelinedData(state, socketWrapper); } } else if (status == SocketEvent.DISCONNECT) { } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); state = checkForPipelinedData(state, socketWrapper); } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ) { //调度Http11Processor.service方法 state = service(socketWrapper); } else if (status == SocketEvent.CONNECT_FAIL) { logAccess(socketWrapper); } else { state = SocketState.CLOSED; } .... } } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED); return state; }


@Override public SocketState service(SocketWrapperbase<?> socketWrapper) ...、 while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !endpoint.isPaused()) { // Parsing the request header try { //解析socket请求数据中每一行,按照http协议解析请求头----只负责解析请求头 if (!inputBuffer.parseRequestLine(keptAlive)) { if (inputBuffer.getParsingRequestLinePhase() == -1) { return SocketState.UPGRADING; } else if (handleIncompleteRequestLineRead()) { break; } } if (endpoint.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!disableUploadTimeout) { socketWrapper.setReadTimeout(connectionUploadTimeout); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break; } catch (Throwable t) { .... } // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); } // Has an upgrade been requested? Enumeration connectionValues = request.getMimeHeaders().values("Connection"); boolean foundUpgrade = false; while (connectionValues.hasMoreElements() && !foundUpgrade) { foundUpgrade = connectionValues.nextElement().toLowerCase( Locale.ENGLISH).contains("upgrade"); } if (foundUpgrade) { // Check the protocol String requestedProtocol = request.getHeader("Upgrade"); UpgradeProtocol upgradeProtocol = httpUpgradeProtocols.get(requestedProtocol); if (upgradeProtocol != null) { if (upgradeProtocol.accept(request)) { // TODO Figure out how to handle request bodies at this // point. response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); response.setHeader("Connection", "Upgrade"); response.setHeader("Upgrade", requestedProtocol); action(ActionCode.CLOSE, null); getAdapter().log(request, response, 0); InternalHttpUpgradeHandler upgradeHandler = upgradeProtocol.getInternalUpgradeHandler( getAdapter(), cloneRequest(request)); UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null); action(ActionCode.UPGRADE, upgradeToken); return SocketState.UPGRADING; } } } if (getErrorState().isIoAllowed()) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { //After reading the request headers, we have to setup the request filters. prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.request.prepare"), t); } // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); } } ... // Process the request in the adapter if (getErrorState().isIoAllowed()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); //将上面socket请求封装而成的request和response对象,再交给适配器进行处理 //获取Coyote适配器,并调用其service方法 getAdapter().service(request, response); ...





@Override public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); ......、 if (connector.getXpoweredBy()) { response.addHeader("X-Powered-By", POWERED_BY); } boolean async = false; boolean postParseSuccess = false; req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get()); try { // Parse and set Catalina and configuration specific // request parameters //解析和设置 Catalina 和特定配置,包括session的查找 //在这里将Request和Response装换成Servlet容器中处理的Request和Response postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async request.setAsyncSupported( connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container //获取Engine容器的第一个Valve,即StandardEnginevalve,并执行. connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); } .......



因为这里默认只有一个标准的Engine value实现:

@Override public final void invoke(Request request, Response response) throws IOException, ServletException { // Select the Host to be used for this Request Host host = request.getHost(); if (host == null) { response.sendError (HttpServletResponse.SC_BAD_REQUEST, sm.getString("standardEngine.noHost", request.getServerName())); return; } if (request.isAsyncSupported()) { request.setAsyncSupported(host.getPipeline().isAsyncSupported()); } // Ask this Host to process this request //这里调用Host管道中的每个pipeline host.getPipeline().getFirst().invoke(request, response); }






// Select the Wrapper to be used for this Request Wrapper wrapper = request.getWrapper();


wrapper.getPipeline().getFirst().invoke(request, response);



// Allocate a servlet instance to process this request try { if (!unavailable) { servlet = wrapper.allocate(); }


// Create the filter chain for this request ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);


public static ApplicationFilterChain createFilterChain(ServletRequest request, Wrapper wrapper, Servlet servlet) { // If there is no servlet to execute, return null if (servlet == null) return null; // Create and initialize a filter chain object ApplicationFilterChain filterChain = null; if (request instanceof Request) { Request req = (Request) request; if (Globals.IS_SECURITY_ENABLED) { // Security: Do not recycle filterChain = new ApplicationFilterChain(); } else { filterChain = (ApplicationFilterChain) req.getFilterChain(); if (filterChain == null) { filterChain = new ApplicationFilterChain(); req.setFilterChain(filterChain); } } } else { // Request dispatcher in use filterChain = new ApplicationFilterChain(); } filterChain.setServlet(servlet); filterChain.setServletSupportsAsync(wrapper.isAsyncSupported()); // Acquire the filter mappings for this Context StandardContext context = (StandardContext) wrapper.getParent(); //wrapper的父容器context中的过滤器集合 FilterMap filterMaps[] = context.findFilterMaps(); // If there are no filter mappings, we are done if ((filterMaps == null) || (filterMaps.length == 0)) return (filterChain); // Acquire the information we will need to match filter mappings DispatcherType dispatcher = (DispatcherType) request.getAttribute(Globals.DISPATCHER_TYPE_ATTR); String requestPath = null; Object attribute = request.getAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR); if (attribute != null){ requestPath = attribute.toString(); } String servletName = wrapper.getName(); // Add the relevant path-mapped filters to this filter chain for (int i = 0; i < filterMaps.length; i++) { if (!matchDispatcher(filterMaps[i] ,dispatcher)) { continue; } if (!matchFiltersURL(filterMaps[i], requestPath)) continue; ApplicationFilterConfig filterConfig = (ApplicationFilterConfig) context.findFilterConfig(filterMaps[i].getFilterName()); if (filterConfig == null) { // FIXME - log configuration problem continue; } //将集合中符合要求当前请求的过滤器都加入过滤器链中 filterChain.addFilter(filterConfig); } // Add filters that match on servlet name second for (int i = 0; i < filterMaps.length; i++) { if (!matchDispatcher(filterMaps[i] ,dispatcher)) { continue; } if (!matchFiltersServlet(filterMaps[i], servletName)) continue; ApplicationFilterConfig filterConfig = (ApplicationFilterConfig) context.findFilterConfig(filterMaps[i].getFilterName()); if (filterConfig == null) { // FIXME - log configuration problem continue; } //将集合中符合要求当前请求的过滤器都加入过滤器链中 filterChain.addFilter(filterConfig); } // Return the completed filter chain //返回构建好的过滤器链 return filterChain; }



filterChain.doFilter (request.getRequest(), response.getResponse());






private void internalDoFilter(ServletRequest request, ServletResponse response) throws IOException, ServletException { // Call the next filter if there is one if (pos < n) { ApplicationFilterConfig filterConfig = filters[pos++]; try { Filter filter = filterConfig.getFilter(); if (request.isAsyncSupported() && "false".equalsIgnoreCase( filterConfig.getFilterDef().getAsyncSupported())) { request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE); } if( Globals.IS_SECURITY_ENABLED ) { final ServletRequest req = request; final ServletResponse res = response; Principal principal = ((HttpServletRequest) req).getUserPrincipal(); Object[] args = new Object[]{req, res, this}; SecurityUtil.doAsPrivilege ("doFilter", filter, classType, args, principal); } else { //重点代码---第三个参数是当前过滤器链对象--ApplicationFilterChain filter.doFilter(request, response, this); } } catch (IOException | ServletException | RuntimeException e) { throw e; } catch (Throwable e) { e = ExceptionUtils.unwrapInvocationTargetException(e); ExceptionUtils.handleThrowable(e); throw new ServletException(sm.getString("filterChain.filter"), e); } return; } // We fell off the end of the chain -- call the servlet instance try { if (ApplicationDispatcher.WRAP_SAME_OBJECT) { lastServicedRequest.set(request); lastServicedResponse.set(response); } if (request.isAsyncSupported() && !servletSupportsAsync) { request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE); } // Use potentially wrapped request from this point if ((request instanceof HttpServletRequest) && (response instanceof HttpServletResponse) && Globals.IS_SECURITY_ENABLED ) { final ServletRequest req = request; final ServletResponse res = response; Principal principal = ((HttpServletRequest) req).getUserPrincipal(); Object[] args = new Object[]{req, res}; SecurityUtil.doAsPrivilege("service", servlet, classTypeUsedInService, args, principal); } else { //过滤器链调用完毕,真正调用servlet的方法 servlet.service(request, response); } } catch (IOException | ServletException | RuntimeException e) { throw e; } catch (Throwable e) { e = ExceptionUtils.unwrapInvocationTargetException(e); ExceptionUtils.handleThrowable(e); throw new ServletException(sm.getString("filterChain.servlet"), e); } finally { if (ApplicationDispatcher.WRAP_SAME_OBJECT) { lastServicedRequest.set(null); lastServicedResponse.set(null); } } }




protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String method = req.getMethod(); if (method.equals(METHOD_GET)) { long lastModified = getLastModified(req); if (lastModified == -1) { // servlet doesn't support if-modified-since, no reason // to go through further expensive logic doGet(req, resp); } else { long ifModifiedSince; try { ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE); } catch (IllegalArgumentException iae) { // Invalid date header - proceed as if none was set ifModifiedSince = -1; } if (ifModifiedSince < (lastModified / 1000 * 1000)) { // If the servlet mod time is later, call doGet() // Round down to the nearest second for a proper compare // A ifModifiedSince of -1 will always be less maybeSetLastModified(resp, lastModified); doGet(req, resp); } else { resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED); } } } else if (method.equals(METHOD_HEAD)) { long lastModified = getLastModified(req); maybeSetLastModified(resp, lastModified); doHead(req, resp); } else if (method.equals(METHOD_POST)) { doPost(req, resp); } else if (method.equals(METHOD_PUT)) { doPut(req, resp); } else if (method.equals(METHOD_DELETE)) { doDelete(req, resp); } else if (method.equals(METHOD_OPTIONS)) { doOptions(req,resp); } else if (method.equals(METHOD_TRACE)) { doTrace(req,resp); } else { // // Note that this means NO servlet supports whatever // method was requested, anywhere on this server. // String errMsg = lStrings.getString("http.method_not_implemented"); Object[] errArgs = new Object[1]; errArgs[0] = method; errMsg = MessageFormat.format(errMsg, errArgs); resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg); } }




@Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { if (req.getDispatcherType() == DispatcherType.ERROR) { doGet(req, resp); } else { super.service(req, resp); } }


else { // // Note that this means NO servlet supports whatever // method was requested, anywhere on this server. // String errMsg = lStrings.getString("http.method_not_implemented"); Object[] errArgs = new Object[1]; errArgs[0] = method; errMsg = MessageFormat.format(errMsg, errArgs); resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg); }


