Tomcat 请求处理流程
请求流程请求流程源码解析源码研究
1.Acceptor.run()2.Poller.run()
2.1 Poller.processKey()2.2 AbstractEndpoint.processSocket() 3、NioEndpoint.SocketProcessor.doRun()4、AbstractProtocol.ConnectionHandler.process5.AbstractProcessorLight.process()6.Http11Processor的service方法7.CoyoteAdapter.service()8.StandardEnginevalve#invoke
9.StandardHostValve#invoke10.StandardContextValve#invoke10.StandardWrapperValve#invoke11.ApplicationFilterChain#doFilter12.ApplicationFilterChain#internalDoFilter13.HttpServlet#service13.DefaultServlet#service
Tomcat 请求处理流程 请求流程
设计了这么多层次的容器,Tomcat是怎么确定每一个请求应该由哪个Wrapper容器里的 Servlet来处理的呢?
答案是,Tomcat是用Mapper组件来完成这个任务的。
Mapper组件的功能就是将用户请求的URL定位到一个Servlet,它的工作原理是:
Mapper组件里保存了Web应用的配置信息,其实就是容器组件与访问路径的映射关系, 比如Host容器里配置的域名、Context容器里的Web应用路径,以及Wrapper容器里 Servlet映射的路径,你可以想象这些配置信息就是一个多层次的Map。
当一个请求到来时,Mapper组件通过解析请求URL里的域名和路径,再到自己保存的 Map里去查找,就能定位到一个Servlet。请你注意,一个请求URL最后只会定位到一个 Wrapper容器,也就是一个Servlet。
下面的示意图中 , 就描述了 当用户请求链接 http://www.itcast.cn/bbs/findAll 之 后, 是如何找到最终处理业务逻辑的servlet 。
那上面这幅图只是描述了根据请求的URL如何查找到需要执行的Servlet , 那么下面我们 再来解析一下 , 从Tomcat的设计架构层面来分析Tomcat的请求处理
步骤如下:
Connector组件Endpoint中的Acceptor监听客户端套接字连接并接收Socket。
将连接交给线程池Executor处理,开始执行请求响应任务。
Processor组件读取消息报文,解析请求行、请求体、请求头,封装成Request对象。
Mapper组件根据请求行的URL值和请求头的Host值匹配由哪个Host容器、Context容器、Wrapper容器处理请求。
CoyoteAdaptor组件负责将Connector组件和Engine容器关联起来,把生成的 Request对象和响应对象Response传递到Engine容器中,调用 Pipeline。
Engine容器的管道开始处理,管道中包含若干个Valve、每个Valve负责部分处理逻 辑。执行完Valve后会执行基础的 Valve–StandardEnginevalve,负责调用Host容器的 Pipeline。
Host容器的管道开始处理,流程类似,最后执行 Context容器的Pipeline。
Context容器的管道开始处理,流程类似,最后执行 Wrapper容器的Pipeline。
Wrapper容器的管道开始处理,流程类似,最后执行 Wrapper容器对应的Servlet对象 的 处理方法。
请求流程源码解析
在前面所讲解的Tomcat的整体架构中,我们发现Tomcat中的各个组件各司其职,组件 之间松耦合,确保了整体架构的可伸缩性和可拓展性,那么在组件内部,如何增强组件 的灵活性和拓展性呢? 在Tomcat中,每个Container组件采用责任链模式来完成具体的 请求处理。
在Tomcat中定义了Pipeline 和 Valve 两个接口,Pipeline 用于构建责任链, 后者代表责 任链上的每个处理器。Pipeline 中维护了一个基础的Valve,它始终位于Pipeline的末端 (最后执行),封装了具体的请求处理和输出响应的过程。当然,我们也可以调用 addValve()方法, 为Pipeline 添加其他的Valve, 后添加的Valve 位于基础的Valve之 前,并按照添加顺序执行。Pipiline通过获得首个Valve来启动整合链条的执行 。
源码研究
建议看源码流程前先去回顾一下责任链模式,因为tomcat的请求流程中主要使用了责任链模式
责任链模式
我们把请求过程的源码分为两部分来进行分析:
1.Acceptor.run()第一部分: 请求由Endpoint捕获,并转交给Processor处理.
请求的流程由NioEndpoint中的Accepter类的run方法开始.
首先一个浏览器的请求会由tomcat中的Endpoint中的Accepter所捕获并开启会话.
protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; while (running) { while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; try { //接受客户端请求 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }
2.Poller.run()当接收客户端请求时,NioEndpoint.Poller会获取事件并开始迭代执行.(一下省略部分代码)
public void run() { while (true) { ....... while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); //开始正式执行请求流程 processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }
2.1 Poller.processKey()processKey方法继续调用其中的processSocket方法,并开始处理会话.
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { //开始处理会话 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } }
2.2 AbstractEndpoint.processSocket()当跟踪进processSocket时会发现调用的是AbstractEndpoint的processSocket方法.
他会首先获取Socket的处理器,并获取线程池为处理Socket单独开启一个线程.
public boolean processSocket(SocketWrapperbase socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } //获取socket的处理器 SocketProcessorbase sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } //获取到线程池 Executor executor = getExecutor(); if (dispatch && executor != null) { //由线程池调用一个线程来执行Socket处理器 executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
经过跟踪会最终发现,sc.run()方法实际上是在调用NioEndpoint.SocketProcessor.doRun();
protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); .... if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket if (event == null) { //获取Handler处理器:Servlet(处理器),调度处理器的处理方法 state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { close(socket, key); } } .... } }
4、AbstractProtocol.ConnectionHandler.process在doRun的方法中最后会交给AbstractProtocol.ConnectionHandler.process方法执行.
@Override public SocketState process(SocketWrapperbase wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // Nothing to do、Socket has been closed. return SocketState.CLOSED; } //获取Socket S socket = wrapper.getSocket(); //交给Processor处理 -> 真正开始处理请求 Processor processor = connections.get(socket); .... do { //开始解析请求 state = processor.process(wrapper, status); ..... } } while ( state == SocketState.UPGRADING);}
至此,请求由Endpoint组件正式转交给Processor进行处理.
5.AbstractProcessorLight.process()主要内容为在请求转交给Coyote适配器后的流程分析,紧接上文中请求交由Processor处理.
当请求由Endpoint交由Processor处理时,首先经过的就是AbstractProcessorLight.process()
public SocketState process(SocketWrapperbase<?> socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator
@Override public SocketState service(SocketWrapperbase<?> socketWrapper) ...、 while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !endpoint.isPaused()) { // Parsing the request header try { //解析socket请求数据中每一行,按照http协议解析请求头----只负责解析请求头 if (!inputBuffer.parseRequestLine(keptAlive)) { if (inputBuffer.getParsingRequestLinePhase() == -1) { return SocketState.UPGRADING; } else if (handleIncompleteRequestLineRead()) { break; } } if (endpoint.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!disableUploadTimeout) { socketWrapper.setReadTimeout(connectionUploadTimeout); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break; } catch (Throwable t) { .... } // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); } // Has an upgrade been requested? Enumeration
至此请求就即将被发送给Engine引擎.
在CoyoteAdapter中将Request和Response装换成Servlet容器中处理的Request和Response,然后从service中获取容器,再调用管道Pipeline的阀门Valve的invoke方法
在Coyote适配器中,会获取Engine容器的第一个Valve,即StandardEnginevalve,并执行.
@Override public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); ......、 if (connector.getXpoweredBy()) { response.addHeader("X-Powered-By", POWERED_BY); } boolean async = false; boolean postParseSuccess = false; req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get()); try { // Parse and set Catalina and configuration specific // request parameters //解析和设置 Catalina 和特定配置,包括session的查找 //在这里将Request和Response装换成Servlet容器中处理的Request和Response postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async request.setAsyncSupported( connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container //获取Engine容器的第一个Valve,即StandardEnginevalve,并执行. connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); } .......
整个容器中执行的链路如图:
8.StandardEnginevalve#invoke
因为这里默认只有一个标准的Engine value实现:
@Override public final void invoke(Request request, Response response) throws IOException, ServletException { // Select the Host to be used for this Request Host host = request.getHost(); if (host == null) { response.sendError (HttpServletResponse.SC_BAD_REQUEST, sm.getString("standardEngine.noHost", request.getServerName())); return; } if (request.isAsyncSupported()) { request.setAsyncSupported(host.getPipeline().isAsyncSupported()); } // Ask this Host to process this request //这里调用Host管道中的每个pipeline host.getPipeline().getFirst().invoke(request, response); }
9.StandardHostValve#invoke
说明一下:在Host管道中,默认还会加入两个Value,一个是accessLogValue,一个是errorLogValue后者用来处理请求处理过程中发生的错误
接下来依次调用context,wrapper的valve并执行.
10.StandardContextValve#invoke
获取当前请求对应的wrapper
// Select the Wrapper to be used for this Request Wrapper wrapper = request.getWrapper();
调用wrapper管道中所有value
wrapper.getPipeline().getFirst().invoke(request, response);
10.StandardWrapperValve#invoke
重点代码一:分配一个servlet实例对象,来处理当前请求,这里的servlet其实就是我们自己定义的
// Allocate a servlet instance to process this request try { if (!unavailable) { servlet = wrapper.allocate(); }
这里的allocate就是从mapped中获取到当前请求映射的servlet,联系初始化时,扫描web.xml和相关注解
// Create the filter chain for this request ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
给当前请求创建一个过滤器链
public static ApplicationFilterChain createFilterChain(ServletRequest request, Wrapper wrapper, Servlet servlet) { // If there is no servlet to execute, return null if (servlet == null) return null; // Create and initialize a filter chain object ApplicationFilterChain filterChain = null; if (request instanceof Request) { Request req = (Request) request; if (Globals.IS_SECURITY_ENABLED) { // Security: Do not recycle filterChain = new ApplicationFilterChain(); } else { filterChain = (ApplicationFilterChain) req.getFilterChain(); if (filterChain == null) { filterChain = new ApplicationFilterChain(); req.setFilterChain(filterChain); } } } else { // Request dispatcher in use filterChain = new ApplicationFilterChain(); } filterChain.setServlet(servlet); filterChain.setServletSupportsAsync(wrapper.isAsyncSupported()); // Acquire the filter mappings for this Context StandardContext context = (StandardContext) wrapper.getParent(); //wrapper的父容器context中的过滤器集合 FilterMap filterMaps[] = context.findFilterMaps(); // If there are no filter mappings, we are done if ((filterMaps == null) || (filterMaps.length == 0)) return (filterChain); // Acquire the information we will need to match filter mappings DispatcherType dispatcher = (DispatcherType) request.getAttribute(Globals.DISPATCHER_TYPE_ATTR); String requestPath = null; Object attribute = request.getAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR); if (attribute != null){ requestPath = attribute.toString(); } String servletName = wrapper.getName(); // Add the relevant path-mapped filters to this filter chain for (int i = 0; i < filterMaps.length; i++) { if (!matchDispatcher(filterMaps[i] ,dispatcher)) { continue; } if (!matchFiltersURL(filterMaps[i], requestPath)) continue; ApplicationFilterConfig filterConfig = (ApplicationFilterConfig) context.findFilterConfig(filterMaps[i].getFilterName()); if (filterConfig == null) { // FIXME - log configuration problem continue; } //将集合中符合要求当前请求的过滤器都加入过滤器链中 filterChain.addFilter(filterConfig); } // Add filters that match on servlet name second for (int i = 0; i < filterMaps.length; i++) { if (!matchDispatcher(filterMaps[i] ,dispatcher)) { continue; } if (!matchFiltersServlet(filterMaps[i], servletName)) continue; ApplicationFilterConfig filterConfig = (ApplicationFilterConfig) context.findFilterConfig(filterMaps[i].getFilterName()); if (filterConfig == null) { // FIXME - log configuration problem continue; } //将集合中符合要求当前请求的过滤器都加入过滤器链中 filterChain.addFilter(filterConfig); } // Return the completed filter chain //返回构建好的过滤器链 return filterChain; }
调用完创建过滤器的方法后,回到invoke方法中
正式对当前请求调用过滤器链
filterChain.doFilter (request.getRequest(), response.getResponse());
11.ApplicationFilterChain#doFilter
重点代码:
internalDoFilter(request,response);
开始真正执行过滤流程
12.ApplicationFilterChain#internalDoFilter
private void internalDoFilter(ServletRequest request, ServletResponse response) throws IOException, ServletException { // Call the next filter if there is one if (pos < n) { ApplicationFilterConfig filterConfig = filters[pos++]; try { Filter filter = filterConfig.getFilter(); if (request.isAsyncSupported() && "false".equalsIgnoreCase( filterConfig.getFilterDef().getAsyncSupported())) { request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE); } if( Globals.IS_SECURITY_ENABLED ) { final ServletRequest req = request; final ServletResponse res = response; Principal principal = ((HttpServletRequest) req).getUserPrincipal(); Object[] args = new Object[]{req, res, this}; SecurityUtil.doAsPrivilege ("doFilter", filter, classType, args, principal); } else { //重点代码---第三个参数是当前过滤器链对象--ApplicationFilterChain filter.doFilter(request, response, this); } } catch (IOException | ServletException | RuntimeException e) { throw e; } catch (Throwable e) { e = ExceptionUtils.unwrapInvocationTargetException(e); ExceptionUtils.handleThrowable(e); throw new ServletException(sm.getString("filterChain.filter"), e); } return; } // We fell off the end of the chain -- call the servlet instance try { if (ApplicationDispatcher.WRAP_SAME_OBJECT) { lastServicedRequest.set(request); lastServicedResponse.set(response); } if (request.isAsyncSupported() && !servletSupportsAsync) { request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE); } // Use potentially wrapped request from this point if ((request instanceof HttpServletRequest) && (response instanceof HttpServletResponse) && Globals.IS_SECURITY_ENABLED ) { final ServletRequest req = request; final ServletResponse res = response; Principal principal = ((HttpServletRequest) req).getUserPrincipal(); Object[] args = new Object[]{req, res}; SecurityUtil.doAsPrivilege("service", servlet, classTypeUsedInService, args, principal); } else { //过滤器链调用完毕,真正调用servlet的方法 servlet.service(request, response); } } catch (IOException | ServletException | RuntimeException e) { throw e; } catch (Throwable e) { e = ExceptionUtils.unwrapInvocationTargetException(e); ExceptionUtils.handleThrowable(e); throw new ServletException(sm.getString("filterChain.servlet"), e); } finally { if (ApplicationDispatcher.WRAP_SAME_OBJECT) { lastServicedRequest.set(null); lastServicedResponse.set(null); } } }
这是过滤器的原理
下面看sevlet的调用,下面演示的是正常流程:
13.HttpServlet#serviceprotected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String method = req.getMethod(); if (method.equals(METHOD_GET)) { long lastModified = getLastModified(req); if (lastModified == -1) { // servlet doesn't support if-modified-since, no reason // to go through further expensive logic doGet(req, resp); } else { long ifModifiedSince; try { ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE); } catch (IllegalArgumentException iae) { // Invalid date header - proceed as if none was set ifModifiedSince = -1; } if (ifModifiedSince < (lastModified / 1000 * 1000)) { // If the servlet mod time is later, call doGet() // Round down to the nearest second for a proper compare // A ifModifiedSince of -1 will always be less maybeSetLastModified(resp, lastModified); doGet(req, resp); } else { resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED); } } } else if (method.equals(METHOD_HEAD)) { long lastModified = getLastModified(req); maybeSetLastModified(resp, lastModified); doHead(req, resp); } else if (method.equals(METHOD_POST)) { doPost(req, resp); } else if (method.equals(METHOD_PUT)) { doPut(req, resp); } else if (method.equals(METHOD_DELETE)) { doDelete(req, resp); } else if (method.equals(METHOD_OPTIONS)) { doOptions(req,resp); } else if (method.equals(METHOD_TRACE)) { doTrace(req,resp); } else { // // Note that this means NO servlet supports whatever // method was requested, anywhere on this server. // String errMsg = lStrings.getString("http.method_not_implemented"); Object[] errArgs = new Object[1]; errArgs[0] = method; errMsg = MessageFormat.format(errMsg, errArgs); resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg); } }
这里的doGet和doPost等方法的调用就是在我们自己的servlet中调用了,因为我们自己的servlet没有重写service方法,所以这里service是调用父类的,而重写的doGet等方法,是调用对应子类的实现
13.DefaultServlet#service
如果没有找到对应的请求映射servlet,会返回DefaultServlet
@Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { if (req.getDispatcherType() == DispatcherType.ERROR) { doGet(req, resp); } else { super.service(req, resp); } }
最终走的是HttpSerlvet的最后一个else分支,因为DefaultServlet是HttpServlet的子类
else { // // Note that this means NO servlet supports whatever // method was requested, anywhere on this server. // String errMsg = lStrings.getString("http.method_not_implemented"); Object[] errArgs = new Object[1]; errArgs[0] = method; errMsg = MessageFormat.format(errMsg, errArgs); resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg); }
到此请求流程代码分析完毕