定义

Connector官方文档

HTTP连接器元素表示支持HTTP/1.1协议的连接器组件。除了能够执行servlet和JSP页面之外,它还使Catalina能够作为独立的web服务器运行。
此组件的特定实例侦听服务器上特定TCP端口号上的连接。可以将一个或多个这样的连接器配置为单个服务的一部分,
每个连接器都转发到关联的引擎来执行请求处理并创建响应。
如果希望使用AJP协议(如mod_jk 1.2)配置用于连接到web服务器的连接器。 (such as the mod_jk 1.2.x connector for Apache 1.3),请参阅AJP连接器文档。
每个传入请求在请求期间都需要一个线程。如果接收到的并发请求多于当前可用的请求处理线程所能处理的,那么将创建更多的线程,
直到配置的最大值(maxThreads属性的值)。如果还同时接收到更多的请求,则将它们堆积在连接器创建的服务器套接字中,
直到配置的最大值(acceptCount属性的值)。任何进一步的同步请求都会收到“连接拒绝”错误,直到有可用的资源来处理它们。

我们使用Tomcat,一般只用到Web服务器、Servlet容器这两个特性,现在用JSP的太少了,使用AJP协议的地方也没接触过。
所以可以简单的将Connector理解为,监听特定TCP端口,接收用户请求,封装成Request和Response,传递到Servlet处理业务,最后返回结果给客户端。

Connector容器,涉及到Http11NioProtocol(ProtocolHandler)、Http11Processor(Processor)、CoyoteAdapter(Adapter)、
NioEndpoint、ConnectionHandler(AbstractEndpoint.Handler)、Poller、Acceptor等,这些类相互协作,监听、处理、封装、传递请求,响应客户。

下面根据生命周期来看Connector类:

Connector的构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Connector(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();

if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
}
} else {
protocolHandlerClassName = protocol;
}

// Instantiate protocol handler
ProtocolHandler p = null;
try {
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}

// Default for Connector depends on this system property
setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
}

首先通过协议名称创建ProtocolHandler,这里假设为Http11NioProtocol,这个类将引领全局,再来看Http11NioProtocol的构造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Http11NioProtocol() {
super(new NioEndpoint());
}
public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S,?> endpoint) {
super(endpoint);
}
public AbstractHttp11Protocol(AbstractEndpoint<S,?> endpoint) {
super(endpoint);
setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
}
public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
this.endpoint = endpoint;
setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}

创建了NioEndpoint,和ConnectionHandler。

initInternal初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
protected void initInternal() throws LifecycleException {
//父类初始化
super.initInternal();
if (protocolHandler == null) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
}
// Initialize adapter
//初始化Adapter
adapter = new CoyoteAdapter(this);
//adapter设置到protocolHandler中
protocolHandler.setAdapter(adapter);
// Make sure parseBodyMethodsSet has a default
if (null == parseBodyMethodsSet) {
setParseBodyMethods(getParseBodyMethods());
}
if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
getProtocolHandlerClassName()));
}
if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
protocolHandler instanceof AbstractHttp11JsseProtocol) {
AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
(AbstractHttp11JsseProtocol<?>) protocolHandler;
if (jsseProtocolHandler.isSSLEnabled() &&
jsseProtocolHandler.getSslImplementationName() == null) {
// OpenSSL is compatible with the JSSE configuration, so use it if APR is available
jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
}
}
try {
//初始化protocolHandler
protocolHandler.init();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
}
}

这里首先初始化CoyoteAdapter,这个类是用来和Container(Servlet)交互的,之后初始化protocolHandler。再来看protocolHandler.init()方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void init() throws Exception {
// Upgrade protocols have to be configured first since the endpoint
// init (triggered via super.init() below) uses this list to configure
// the list of ALPN protocols to advertise
for (UpgradeProtocol upgradeProtocol : upgradeProtocols) {
configureUpgradeProtocol(upgradeProtocol);
}

super.init();
}

协议升级操作,再调用上一层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void init() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
}

if (oname == null) {
// Component not pre-registered so register it
oname = createObjectName();
if (oname != null) {
Registry.getRegistry(null, null).registerComponent(this, oname, null);
}
}

if (this.domain != null) {
rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
Registry.getRegistry(null, null).registerComponent(
getHandler().getGlobal(), rgOname, null);
}

String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);

endpoint.init();
}

这里重要的是endpoint.init()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void init() throws Exception {
if (bindOnInit) {
bind();
bindState = BindState.BOUND_ON_INIT;
}
if (this.domain != null) {
// Register endpoint (as ThreadPool - historical name)
oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
Registry.getRegistry(null, null).registerComponent(this, oname, null);

for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
registerJmx(sslHostConfig);
}
}
}

是否要在初始化的时候创建连接。默认为true,进入bind方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
//初始化服务套接字
initServerSocket();
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
//计数器
setStopLatch(new CountDownLatch(pollerThreadCount));
// Initialize SSL if needed
initialiseSsl();
//打开selector池
selectorPool.open();
}
/**
* Separated out to make it easier for folks that extend NioEndpoint to implement custom [server]sockets
*/
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = (getAddress() != null ? new InetSocketAddress(getAddress(), getPort()) : new InetSocketAddress(getPort()));
serverSock.socket().bind(addr, getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
//mimic APR behavior
serverSock.configureBlocking(true);
}

初始化服务套接字等操作。

startInternal开启

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Begin processing requests via this Connector.
*
* @exception LifecycleException if a fatal startup error occurs
*/
@Override
protected void startInternal() throws LifecycleException {
// Validate settings before starting
if (getPort() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPort())));
}
setState(LifecycleState.STARTING);
try {
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}

Connector的startInternal最后调用到了protocolHandler.start()方法,从上一步的initInternal到这一步,已经能简单的看出,
Connector组件的核心就是这个protocolHandler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void start() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
}
//AbstractEndpoint
endpoint.start();
// Start async timeout thread
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}

这里有两步,开启endpoint,初始化AsyncTimeout,这两个部分以前分析过,这里再简单过一遍:
AsyncTimeout的作用是周期检测异步请求是否超时,重要的还是endpoint.start()方法:

1
2
3
4
5
6
7
8
9
10
public final void start() throws Exception {
//bind状态为未绑定的时候,执行bind
if (bindState == BindState.UNBOUND) {
//子类实现
bind();
bindState = BindState.BOUND_ON_START;
}
//这由子类实现
startInternal();
}

这里只需看endpoint的startInternal方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
//用于SocketProcessor对象的缓存
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
//为轮询器事件缓存PollerEvent
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
//Bytebuffer缓存,每个通道持有一组缓冲区(两个,SSL持有四个)
//NioChannel
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
//初始化连接限制
initializeConnectionLatch();
// Start poller threads
//轮询器启动线程,都为守护线程
pollers = new Poller[getPollerThreadCount()];
for (int i = 0; i < pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//开始所有的Acceptor线程,用于监听套接字的
startAcceptorThreads();
}
}

这里主要是创建了acceptor和poller线程,下面分别看一下:

Acceptor

主要作用是获取新的连接,组装后注册到Poller中。

Poller

首先看一下构造函数:

1
2
3
public Poller() throws IOException {
this.selector = Selector.open();
}

可以看到一个Poller对应一个selector, 用于检测已就绪的 Socket。 默认最多不超过 2 个,这里简单的介绍,后期后详细分析。

stopInternal关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Terminate processing requests via this Connector.
*
* @exception LifecycleException if a fatal shutdown error occurs
*/
@Override
protected void stopInternal() throws LifecycleException {
setState(LifecycleState.STOPPING);
try {
if (protocolHandler != null) {
protocolHandler.stop();
}
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStopFailed"), e);
}
}

依然调用protocolHandler方法:

1
2
3
4
5
6
7
8
9
10
@Override
public void stop() throws Exception {
if(getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
}
if (asyncTimeout != null) {
asyncTimeout.stop();
}
endpoint.stop();
}

终止异步检测线程,关闭endpoint.AbstractEndpoint中的stop方法:

1
2
3
4
5
6
7
public final void stop() throws Exception {
stopInternal();
if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) {
unbind();
bindState = BindState.UNBOUND;
}
}

分别调用NioEndpoint的stopInternal与unbind,这些方法后期会分析,这里先略过。

destroyInternal销毁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected void destroyInternal() throws LifecycleException {
try {
if (protocolHandler != null) {
protocolHandler.destroy();
}
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerDestroyFailed"), e);
}

if (getService() != null) {
getService().removeConnector(this);
}

super.destroyInternal();
}

除了调用 protocolHandler.destroy()方法外,还解除了Service容器与当前Connector之间的关系。
protocolHandler.destroy():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public void destroy() throws Exception {
if(getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
}

try {
endpoint.destroy();
} finally {
if (oname != null) {
if (mserver == null) {
Registry.getRegistry(null, null).unregisterComponent(oname);
} else {
// Possibly registered with a different MBeanServer
try {
mserver.unregisterMBean(oname);
} catch (MBeanRegistrationException | InstanceNotFoundException e) {
getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
oname, mserver));
}
}
}

if (rgOname != null) {
Registry.getRegistry(null, null).unregisterComponent(rgOname);
}
}
}
public final void destroy() throws Exception {
if (bindState == BindState.BOUND_ON_INIT) {
unbind();
bindState = BindState.UNBOUND;
}
Registry registry = Registry.getRegistry(null, null);
registry.unregisterComponent(oname);
for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
unregisterJmx(sslHostConfig);
}
}

本篇文章就分析到这里吧,有个重要的逻辑没有分析,就是Acceptor接收请求,传递到Poller中处理,再转给worker线程包装,
最后调用容器的 pipeline 进行处理。