各个组件的startInternal

Tomcat启动的时候会调用到Catalina的start方法,内部会调用其持有的Server组件的start方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Start the new server
try {
//调用Server的start方法
getServer().start();
} catch (LifecycleException e) {
log.fatal(sm.getString("catalina.serverStartFail"), e);
try {
//异常的时候调用Server的destroy方法
getServer().destroy();
} catch (LifecycleException e1) {
log.debug("destroy() failed for failed Server ", e1);
}
return;
}

这也是生命周期函数,最终会调用StandardServer的startInternal方法:

StandardServer的startInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void startInternal() throws LifecycleException {
//触发CONFIGURE_START_EVENT
fireLifecycleEvent(CONFIGURE_START_EVENT, null);
setState(LifecycleState.STARTING);
//NamingResourcesImpl生命周期start
globalNamingResources.start();
// Start our defined Services
synchronized (servicesLock) {
for (int i = 0; i < services.length; i++) {
//start
services[i].start();
}
}
}

传递到Service组件的startInternal:

StandardService的startInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected void startInternal() throws LifecycleException {

if (log.isInfoEnabled())
log.info(sm.getString("standardService.start.name", this.name));
setState(LifecycleState.STARTING);

// Start our defined Container first
if (engine != null) {
synchronized (engine) {
engine.start();
}
}

synchronized (executors) {
for (Executor executor : executors) {
executor.start();
}
}

mapperListener.start();

// Start our defined Connectors second
synchronized (connectorsLock) {
for (Connector connector : connectors) {
// If it has already failed, don't try and start it
if (connector.getState() != LifecycleState.FAILED) {
connector.start();
}
}
}
}

继续向下一层传递start方法:

StandardEngine的startInternal

1
2
3
4
5
6
7
8
9
protected synchronized void startInternal() throws LifecycleException {

// Log our server identification information
if(log.isInfoEnabled())
log.info( "Starting Servlet Engine: " + ServerInfo.getServerInfo());

// Standard container startup
super.startInternal();
}

Connector的startInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void startInternal() throws LifecycleException {

// Validate settings before starting
if (getPort() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPort())));
}

setState(LifecycleState.STARTING);

try {
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}

在这要看一看 protocolHandler.start()方法了。

protocolHandler.start

这里的start方法并不是Lifecycle中的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public interface ProtocolHandler {
//The adapter, used to call the connector.
public void setAdapter(Adapter adapter);
public Adapter getAdapter();
//The executor, provide access to the underlying thread pool.
public Executor getExecutor();
//Initialise the protocol.
public void init() throws Exception;
//Start the protocol.
public void start() throws Exception;
//Pause the protocol (optional).
public void pause() throws Exception;
//Resume the protocol (optional).
public void resume() throws Exception;
//Stop the protocol.
public void stop() throws Exception;
//Destroy the protocol (optional).
public void destroy() throws Exception;
/**
* Close the server socket (to prevent further connections) if the server
* socket was bound on {@link #start()} (rather than on {@link #init()}
* but do not perform any further shutdown.
*/
public void closeServerSocketGraceful();
/**
* Requires APR/native library
*
* @return <code>true</code> if this Protocol Handler requires the
* APR/native library, otherwise <code>false</code>
*/
public boolean isAprRequired();
/**
* Does this ProtocolHandler support sendfile?
*
* @return <code>true</code> if this Protocol Handler supports sendfile,
* otherwise <code>false</code>
*/
public boolean isSendfileSupported();
public void addSslHostConfig(SSLHostConfig sslHostConfig);
public SSLHostConfig[] findSslHostConfigs();
public void addUpgradeProtocol(UpgradeProtocol upgradeProtocol);
public UpgradeProtocol[] findUpgradeProtocols();
}

在AbstractProtocol中实现了start方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void start() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
}
//AbstractEndpoint
endpoint.start();
// Start async timeout thread
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}

这里有两步:
1、endpoint.start()
2、异步启动一个守护线程AsyncTimeout
乍眼看去,一脸茫然,下面好好分析这些到底是什么。

endpoint.start()

endpoint的来源

首先看一看到endpoint为AbstractEndpoint:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Endpoint that provides low-level network I/O - must be matched to the
* ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO
* Endpoint etc.).
*/
private final AbstractEndpoint<S,?> endpoint;

public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
this.endpoint = endpoint;
//-1
setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
//true
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}

可以看到endpoint是AbstractProtocol的构造方法传入的,看一下有谁调用:
//TODO

看着好熟悉啊,配置文件server.xml中好像配置了协议:

1
2
3
4
<Connector port="8080" protocol="HTTP/1.1"
connectionTimeout="20000"
redirectPort="8443" />
<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />

再看一下解析Connector的策略:

1
2
digester.addRule("Server/Service/Connector",
new ConnectorCreateRule());

在ConnectorCreateRule中找到下面的语句:

1
Connector con = new Connector(attributes.getValue("protocol"));

可以看到是在这里获取的协议名称,再继续看Connector的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Connector(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();

if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
}
} else {
protocolHandlerClassName = protocol;
}

// Instantiate protocol handler
ProtocolHandler p = null;
try {
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}

// Default for Connector depends on this system property
setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
}

原来在这里啊,通过协议名称获取协议处理器的类全量名,再通过反射,生成了Connector的协议对应的ProtocolHandler。
假设当前是”HTTP/1.1”,并且没有开启系统级别异步处理,那么会执行org.apache.coyote.http11.Http11NioProtocol的构造函数:

1
2
3
public Http11NioProtocol() {
super(new NioEndpoint());
}

新建一个NioEndpoint,传入父类构造方法:

1
2
3
public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S,?> endpoint) {
super(endpoint);
}

继续上传AbstractHttp11Protocol:

1
2
3
4
5
6
7
public AbstractHttp11Protocol(AbstractEndpoint<S,?> endpoint) {
super(endpoint);
setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
}

再上一层:AbstractProtocol

1
2
3
4
5
public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
this.endpoint = endpoint;
setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}

这就知道了endpoint的来源,这个endpoint就是最初的:NioEndpoint。
简单看一下类继承图:
//TODO

endpoint.start

再来看NioEndpoint的start方法,这会调用到父类AbstractEndpoint中的方法:

1
2
3
4
5
6
7
8
9
10
public final void start() throws Exception {
//bind状态为未绑定的时候,执行bind
if (bindState == BindState.UNBOUND) {
//子类实现
bind();
bindState = BindState.BOUND_ON_START;
}
//这由子类实现
startInternal();
}

这由两个重要的方法都是由子类NioEndpoint来实现:

bind()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void bind() throws Exception {
//初始化服务套接字
initServerSocket();
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
//计数器
setStopLatch(new CountDownLatch(pollerThreadCount));
// Initialize SSL if needed
initialiseSsl();
//打开selector池
selectorPool.open();
}

这里面涉及到几个概念:

  • acceptor
    用于监听套接字,将已连接套接字转给Poller线程。
  • poller
    Poller线程主要用于以较少的资源轮询已连接套接字以保持连接,当数据可用时转给工作线程。
  • selectorPool
    管理Nio中的selector。
    这些内容的详细协作后面再分析,再来看其中的initServerSocket方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * Separated out to make it easier for folks that extend NioEndpoint to implement custom [server]sockets
    */
    protected void initServerSocket() throws Exception {
    if (!getUseInheritedChannel()) {
    serverSock = ServerSocketChannel.open();
    socketProperties.setProperties(serverSock.socket());
    InetSocketAddress addr = (getAddress() != null ? new InetSocketAddress(getAddress(), getPort()) : new InetSocketAddress(getPort()));
    serverSock.socket().bind(addr, getAcceptCount());
    } else {
    // Retrieve the channel provided by the OS
    Channel ic = System.inheritedChannel();
    if (ic instanceof ServerSocketChannel) {
    serverSock = (ServerSocketChannel) ic;
    }
    if (serverSock == null) {
    throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
    }
    }
    //mimic APR behavior
    serverSock.configureBlocking(true);
    }

    这里就是NIO操作了,NIO的网络操作等后期专门再研究一下。
startInternal()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
//用于SocketProcessor对象的缓存
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
//为轮询器事件缓存PollerEvent
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
//Bytebuffer缓存,每个通道持有一组缓冲区(两个,SSL持有四个)
//NioChannel
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
//初始化连接限制
initializeConnectionLatch();
// Start poller threads
//轮询器启动线程,都为守护线程
pollers = new Poller[getPollerThreadCount()];
for (int i = 0; i < pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//开始所有的Acceptor线程,用于监听套接字的
startAcceptorThreads();
}
}
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);

for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}

主要用于创建acceptor, poller线程。这些角色的详细介绍将在下一篇文章介绍。

AsyncTimeout

再回到AbstractProtocol中的start方法,看一下AsyncTimeout的作用,AsyncTimeout实现了Runnable接口,所以直接看一下run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* The background thread that checks async requests and fires the
* timeout if there has been no activity.
* 后台线程检查异步请求,如果没有活动,则触发超时。
*/
@Override
public void run() {
// Loop until we receive a shutdown command
//循环,直到收到关机命令
while (asyncTimeoutRunning) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
long now = System.currentTimeMillis();
for (Processor processor : waitingProcessors) {
processor.timeoutAsync(now);
}
// Loop if endpoint is paused
//如果端点暂停,则循环
while (endpoint.isPaused() && asyncTimeoutRunning) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}

检测超时的请求,并将该请求再转发到工作线程池处理。

简单小结

本篇主要分析了Catalina的start方法内部调用其持有的Server组件的start方法。属于生命周期方法,进行了一些初始化操作,
这里最重要的要数Connector组件的启动,它会开启本地服务Socket,并且创建并启动acceptor, poller线程,用以等待客户的连接。
这些线程是怎么配合工作的就要留到后期分析了。