定义 Connector官方文档
HTTP连接器元素表示支持HTTP/1.1协议的连接器组件。除了能够执行servlet和JSP页面之外,它还使Catalina能够作为独立的web服务器运行。 此组件的特定实例侦听服务器上特定TCP端口号上的连接。可以将一个或多个这样的连接器配置为单个服务的一部分, 每个连接器都转发到关联的引擎来执行请求处理并创建响应。 如果希望使用AJP协议(如mod_jk 1.2)配置用于连接到web服务器的连接器。 (such as the mod_jk 1.2.x connector for Apache 1.3),请参阅AJP连接器文档。 每个传入请求在请求期间都需要一个线程。如果接收到的并发请求多于当前可用的请求处理线程所能处理的,那么将创建更多的线程, 直到配置的最大值(maxThreads属性的值)。如果还同时接收到更多的请求,则将它们堆积在连接器创建的服务器套接字中, 直到配置的最大值(acceptCount属性的值)。任何进一步的同步请求都会收到“连接拒绝”错误,直到有可用的资源来处理它们。
我们使用Tomcat,一般只用到Web服务器、Servlet容器这两个特性,现在用JSP的太少了,使用AJP协议的地方也没接触过。 所以可以简单的将Connector理解为,监听特定TCP端口,接收用户请求,封装成Request和Response,传递到Servlet处理业务,最后返回结果给客户端。
Connector容器,涉及到Http11NioProtocol(ProtocolHandler)、Http11Processor(Processor)、CoyoteAdapter(Adapter)、 NioEndpoint、ConnectionHandler(AbstractEndpoint.Handler)、Poller、Acceptor等,这些类相互协作,监听、处理、封装、传递请求,响应客户。
下面根据生命周期来看Connector类:
Connector的构造 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public Connector (String protocol) { boolean aprConnector = AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseAprConnector(); if ("HTTP/1.1" .equals(protocol) || protocol == null ) { if (aprConnector) { protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol" ; } else { protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol" ; } } else if ("AJP/1.3" .equals(protocol)) { if (aprConnector) { protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol" ; } else { protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol" ; } } else { protocolHandlerClassName = protocol; } ProtocolHandler p = null ; try { Class<?> clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed" ), e); } finally { this .protocolHandler = p; } setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE" )); }
首先通过协议名称创建ProtocolHandler,这里假设为Http11NioProtocol,这个类将引领全局,再来看Http11NioProtocol的构造:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public Http11NioProtocol () { super (new NioEndpoint ()); } public AbstractHttp11JsseProtocol (AbstractJsseEndpoint<S,?> endpoint) { super (endpoint); } public AbstractHttp11Protocol (AbstractEndpoint<S,?> endpoint) { super (endpoint); setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); ConnectionHandler<S> cHandler = new ConnectionHandler <>(this ); setHandler(cHandler); getEndpoint().setHandler(cHandler); } public AbstractProtocol (AbstractEndpoint<S,?> endpoint) { this .endpoint = endpoint; setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); }
创建了NioEndpoint,和ConnectionHandler。
initInternal初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override protected void initInternal () throws LifecycleException { super .initInternal(); if (protocolHandler == null ) { throw new LifecycleException ( sm.getString("coyoteConnector.protocolHandlerInstantiationFailed" )); } adapter = new CoyoteAdapter (this ); protocolHandler.setAdapter(adapter); if (null == parseBodyMethodsSet) { setParseBodyMethods(getParseBodyMethods()); } if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) { throw new LifecycleException (sm.getString("coyoteConnector.protocolHandlerNoApr" , getProtocolHandlerClassName())); } if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() && protocolHandler instanceof AbstractHttp11JsseProtocol) { AbstractHttp11JsseProtocol<?> jsseProtocolHandler = (AbstractHttp11JsseProtocol<?>) protocolHandler; if (jsseProtocolHandler.isSSLEnabled() && jsseProtocolHandler.getSslImplementationName() == null ) { jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName()); } } try { protocolHandler.init(); } catch (Exception e) { throw new LifecycleException ( sm.getString("coyoteConnector.protocolHandlerInitializationFailed" ), e); } }
这里首先初始化CoyoteAdapter,这个类是用来和Container(Servlet)交互的,之后初始化protocolHandler。再来看protocolHandler.init()方法:
1 2 3 4 5 6 7 8 9 10 11 @Override public void init () throws Exception { for (UpgradeProtocol upgradeProtocol : upgradeProtocols) { configureUpgradeProtocol(upgradeProtocol); } super .init(); }
协议升级操作,再调用上一层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public void init () throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.init" , getName())); } if (oname == null ) { oname = createObjectName(); if (oname != null ) { Registry.getRegistry(null , null ).registerComponent(this , oname, null ); } } if (this .domain != null ) { rgOname = new ObjectName (domain + ":type=GlobalRequestProcessor,name=" + getName()); Registry.getRegistry(null , null ).registerComponent( getHandler().getGlobal(), rgOname, null ); } String endpointName = getName(); endpoint.setName(endpointName.substring(1 , endpointName.length()-1 )); endpoint.setDomain(domain); endpoint.init(); }
这里重要的是endpoint.init()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public final void init () throws Exception { if (bindOnInit) { bind(); bindState = BindState.BOUND_ON_INIT; } if (this .domain != null ) { oname = new ObjectName (domain + ":type=ThreadPool,name=\"" + getName() + "\"" ); Registry.getRegistry(null , null ).registerComponent(this , oname, null ); for (SSLHostConfig sslHostConfig : findSslHostConfigs()) { registerJmx(sslHostConfig); } } }
是否要在初始化的时候创建连接。默认为true,进入bind方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Override public void bind () throws Exception { initServerSocket(); if (acceptorThreadCount == 0 ) { acceptorThreadCount = 1 ; } if (pollerThreadCount <= 0 ) { pollerThreadCount = 1 ; } setStopLatch(new CountDownLatch (pollerThreadCount)); initialiseSsl(); selectorPool.open(); } protected void initServerSocket () throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = (getAddress() != null ? new InetSocketAddress (getAddress(), getPort()) : new InetSocketAddress (getPort())); serverSock.socket().bind(addr, getAcceptCount()); } else { Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null ) { throw new IllegalArgumentException (sm.getString("endpoint.init.bind.inherited" )); } } serverSock.configureBlocking(true ); }
初始化服务套接字等操作。
startInternal开启 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override protected void startInternal () throws LifecycleException { if (getPort() < 0 ) { throw new LifecycleException (sm.getString( "coyoteConnector.invalidPort" , Integer.valueOf(getPort()))); } setState(LifecycleState.STARTING); try { protocolHandler.start(); } catch (Exception e) { throw new LifecycleException ( sm.getString("coyoteConnector.protocolHandlerStartFailed" ), e); } }
Connector的startInternal最后调用到了protocolHandler.start()方法,从上一步的initInternal到这一步,已经能简单的看出, Connector组件的核心就是这个protocolHandler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void start () throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.start" , getName())); } endpoint.start(); asyncTimeout = new AsyncTimeout (); Thread timeoutThread = new Thread (asyncTimeout, getNameInternal() + "-AsyncTimeout" ); int priority = endpoint.getThreadPriority(); if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { priority = Thread.NORM_PRIORITY; } timeoutThread.setPriority(priority); timeoutThread.setDaemon(true ); timeoutThread.start(); }
这里有两步,开启endpoint,初始化AsyncTimeout,这两个部分以前分析过,这里再简单过一遍: AsyncTimeout的作用是周期检测异步请求是否超时,重要的还是endpoint.start()方法:
1 2 3 4 5 6 7 8 9 10 public final void start () throws Exception { if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } startInternal(); }
这里只需看endpoint的startInternal方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public void startInternal () throws Exception { if (!running) { running = true ; paused = false ; processorCache = new SynchronizedStack <>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack <>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack <>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); if (getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); pollers = new Poller [getPollerThreadCount()]; for (int i = 0 ; i < pollers.length; i++) { pollers[i] = new Poller (); Thread pollerThread = new Thread (pollers[i], getName() + "-ClientPoller-" + i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true ); pollerThread.start(); } startAcceptorThreads(); } }
这里主要是创建了acceptor和poller线程,下面分别看一下:
Acceptor 主要作用是获取新的连接,组装后注册到Poller中。
Poller 首先看一下构造函数:
1 2 3 public Poller () throws IOException { this .selector = Selector.open(); }
可以看到一个Poller对应一个selector, 用于检测已就绪的 Socket。 默认最多不超过 2 个,这里简单的介绍,后期后详细分析。
stopInternal关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override protected void stopInternal () throws LifecycleException { setState(LifecycleState.STOPPING); try { if (protocolHandler != null ) { protocolHandler.stop(); } } catch (Exception e) { throw new LifecycleException ( sm.getString("coyoteConnector.protocolHandlerStopFailed" ), e); } }
依然调用protocolHandler方法:
1 2 3 4 5 6 7 8 9 10 @Override public void stop () throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.stop" , getName())); } if (asyncTimeout != null ) { asyncTimeout.stop(); } endpoint.stop(); }
终止异步检测线程,关闭endpoint.AbstractEndpoint中的stop方法:
1 2 3 4 5 6 7 public final void stop () throws Exception { stopInternal(); if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) { unbind(); bindState = BindState.UNBOUND; } }
分别调用NioEndpoint的stopInternal与unbind,这些方法后期会分析,这里先略过。
destroyInternal销毁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override protected void destroyInternal () throws LifecycleException { try { if (protocolHandler != null ) { protocolHandler.destroy(); } } catch (Exception e) { throw new LifecycleException ( sm.getString("coyoteConnector.protocolHandlerDestroyFailed" ), e); } if (getService() != null ) { getService().removeConnector(this ); } super .destroyInternal(); }
除了调用 protocolHandler.destroy()方法外,还解除了Service容器与当前Connector之间的关系。 protocolHandler.destroy():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public void destroy () throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.destroy" , getName())); } try { endpoint.destroy(); } finally { if (oname != null ) { if (mserver == null ) { Registry.getRegistry(null , null ).unregisterComponent(oname); } else { try { mserver.unregisterMBean(oname); } catch (MBeanRegistrationException | InstanceNotFoundException e) { getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed" , oname, mserver)); } } } if (rgOname != null ) { Registry.getRegistry(null , null ).unregisterComponent(rgOname); } } } public final void destroy () throws Exception { if (bindState == BindState.BOUND_ON_INIT) { unbind(); bindState = BindState.UNBOUND; } Registry registry = Registry.getRegistry(null , null ); registry.unregisterComponent(oname); for (SSLHostConfig sslHostConfig : findSslHostConfigs()) { unregisterJmx(sslHostConfig); } }
本篇文章就分析到这里吧,有个重要的逻辑没有分析,就是Acceptor接收请求,传递到Poller中处理,再转给worker线程包装, 最后调用容器的 pipeline 进行处理。