/** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */ publicbooleanprocessSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { returnfalse; } //尝试使用缓存中的SocketProcessor SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { //创建一个新的SocketProcessor sc = createSocketProcessor(socketWrapper, event); } else { //重置 sc.reset(socketWrapper, event); } //获取工作线程池 Executorexecutor= getExecutor(); if (dispatch && executor != null) { //在线程池中执行SocketProcessor任务 executor.execute(sc); } else { //在当前线程执行SocketProcessor的run方法 sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); returnfalse; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); returnfalse; } returntrue; }
@Override publicfinalvoidrun() { synchronized (socketWrapper) { // It is possible that processing may be triggered for read and // write at the same time. The sync above makes sure that processing // does not occur in parallel. The test below ensures that if the // first event to be processed results in the socket being closed, // the subsequent events are not processed. if (socketWrapper.isClosed()) { return; } doRun(); } } @Override protectedvoiddoRun() { //获取用户连接 NioChannelsocket= socketWrapper.getSocket(); SelectionKeykey= socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try { //https的握手行为 inthandshake= -1;
try { if (key != null) { if (socket.isHandshakeComplete()) { // No TLS handshaking required. Let the handler // process this socket / event combination. handshake = 0; } elseif (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT || event == SocketEvent.ERROR) { // Unable to complete the TLS handshake. Treat it as // if the handshake failed. handshake = -1; } else { handshake = socket.handshake(key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. event = SocketEvent.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake", x); } catch (CancelledKeyException ckx) { handshake = -1; } //握手完成,或者不需要握手时 handshake == 0 if (handshake == 0) { SocketStatestate= SocketState.OPEN; // Process the request from this socket if (event == null) { //默认是读事件处理 state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { //相应指定事件 state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { close(socket, key); } } elseif (handshake == -1) { close(socket, key); } elseif (handshake == SelectionKey.OP_READ) { socketWrapper.registerReadInterest(); } elseif (handshake == SelectionKey.OP_WRITE) { socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error("", t); socket.getPoller().cancelledKey(key); } finally { socketWrapper = null; event = null; //return to cache if (running && !paused) { //缓存 processorCache.push(this); } } } }
看下面这句代码:
1 2
//相应指定事件 state = getHandler().process(socketWrapper, event);
@Override public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {
SocketStatestate= SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { DispatchTypenextDispatch= dispatches.next(); //处理不在标准HTTP模式下的正在处理中的请求。 //目前使用的包括Servlet 3.0异步和HTTP升级连接 //将来可能会增加更多的用途。这些通常以HTTP请求开始。 state = dispatch(nextDispatch.getSocketStatus()); } elseif (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } elseif (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } elseif (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } elseif (status == SocketEvent.OPEN_READ){ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; }
if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); }
if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } }
if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED);
// Parsing the request header //解析请求头 try { if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())) { if (inputBuffer.getParsingRequestLinePhase() == -1) { return SocketState.UPGRADING; } elseif (handleIncompleteRequestLineRead()) { break; } }
if (protocol.isPaused()) { //response:org.apache.coyote.Response // 503 - Service unavailable //服务无效;无法提供服务;找不到服务器 response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!protocol.getDisableUploadTimeout()) { socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout()); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); UserDataHelper.ModelogMode= userDataHelper.getNextMode(); if (logMode != null) { Stringmessage= sm.getString("http11processor.header.parse"); switch (logMode) { case INFO_THEN_DEBUG: message += sm.getString("http11processor.fallToDebug"); //$FALL-THROUGH$ case INFO: log.info(message, t); break; case DEBUG: log.debug(message, t); } } // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); }
// Has an upgrade been requested? //升级协议 Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection"); booleanfoundUpgrade=false; while (connectionValues.hasMoreElements() && !foundUpgrade) { foundUpgrade = connectionValues.nextElement().toLowerCase( Locale.ENGLISH).contains("upgrade"); }
if (foundUpgrade) { // Check the protocol StringrequestedProtocol= request.getHeader("Upgrade");
// Process the request in the adapter if (getErrorState().isIoAllowed()) { try { //服务处理阶段 rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); //获取CoyoteAdapter,调用其service方法 getAdapter().service(request, response); // Handle when the response was committed before a serious // error occurred. Throwing a ServletException should both // set the status to 500 and set the errorException. // If we fail here, then the response is likely already // committed, so we can't try and set headers. if (keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) { setErrorState(ErrorState.CLOSE_CLEAN, null); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (HeadersTooLargeException e) { log.error(sm.getString("http11processor.request.process"), e); // The response should not have been committed but check it // anyway to be safe if (response.isCommitted()) { setErrorState(ErrorState.CLOSE_NOW, e); } else { response.reset(); response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, e); response.setHeader("Connection", "close"); // TODO: Remove } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("http11processor.request.process"), t); // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } }
// Finish the handling of the request //完成请求的处理 rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); if (!isAsync()) { // If this is an async request then the request ends when it has // been completed. The AsyncContext is responsible for calling // endRequest() in that case. endRequest(); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
// If there was an error, make sure the request is counted as // and error, and update the statistics counter if (getErrorState().isError()) { response.setStatus(500); }
if (!isAsync() || getErrorState().isError()) { request.updateCounters(); if (getErrorState().isIoAllowed()) { inputBuffer.nextRequest(); outputBuffer.nextRequest(); } }
if (!protocol.getDisableUploadTimeout()) { intconnectionTimeout= protocol.getConnectionTimeout(); if (connectionTimeout > 0) { socketWrapper.setReadTimeout(connectionTimeout); } else { socketWrapper.setReadTimeout(0); } }
try { // Parse and set Catalina and configuration specific // request parameters //解析并设置Catalina和配置特定的请求参数 postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async //如果我们支持异步止回阀 request.setAsyncSupported( connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container //调用容器 connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); } //如果是异步请求 if (request.isAsync()) { async = true; ReadListenerreadListener= req.getReadListener(); if (readListener != null && request.isFinished()) { // Possible the all data may have been read during service() // method so this needs to be checked here ClassLoaderoldCL=null; try { oldCL = request.getContext().bind(false, null); if (req.sendAllDataReadEvent()) { req.getReadListener().onAllDataRead(); } } finally { request.getContext().unbind(false, oldCL); } }
// If an async request was started, is not going to end once // this container thread finishes and an error occurred, trigger // the async error process if (!request.isAsyncCompleting() && throwable != null) { request.getAsyncContextInternal().setErrorState(throwable, true); } } else { //结束请求和响应 request.finishRequest(); response.finishResponse(); }
if (request.isAsyncCompleting() && error.get()) { // Connection will be forcibly closed which will prevent // completion happening at the usual point. Need to trigger // call to onComplete() here. res.action(ActionCode.ASYNC_POST_PROCESS, null); async = false; }
// Access log if (!async && postParseSuccess) { // Log only if processing was invoked. // If postParseRequest() failed, it has already logged it. Contextcontext= request.getContext(); Hosthost= request.getHost(); // If the context is null, it is likely that the endpoint was // shutdown, this connection closed and the request recycled in // a different thread. That thread will have updated the access // log so it is OK not to update the access log here in that // case. // The other possibility is that an error occurred early in // processing and the request could not be mapped to a Context. // Log via the host or engine in that case. longtime= System.currentTimeMillis() - req.getStartTime(); if (context != null) { context.logAccess(request, response, time, false); } elseif (response.isError()) { if (host != null) { host.logAccess(request, response, time, false); } else { connector.getService().getContainer().logAccess( request, response, time, false); } } }