引言

Acceptor监听指定端口,获取到SocketChannel,将其封装成NioChannel注册到Poller;
Poller接收到Acceptor发过来的NioChannel,将其封装成PollerEvent放入本地队列;
Poller消费并处理本地队列PollerEvent,组装SocketProcessor对象,交由工作线程池Executor处理。

本篇主要分析工作线程池处理SocketProcessor对象。
先贴上Poller与工作线程池交接的方法processSocket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Process the given SocketWrapper with the given status. Used to trigger
* processing as if the Poller (for those endpoints that have one)
* selected the socket.
*
* @param socketWrapper The socket wrapper to process
* @param event The socket event to be processed
* @param dispatch Should the processing be performed on a new
* container thread
*
* @return if processing was triggered successfully
*/
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
//尝试使用缓存中的SocketProcessor
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
//创建一个新的SocketProcessor
sc = createSocketProcessor(socketWrapper, event);
} else {
//重置
sc.reset(socketWrapper, event);
}
//获取工作线程池
Executor executor = getExecutor();
if (dispatch && executor != null) {
//在线程池中执行SocketProcessor任务
executor.execute(sc);
} else {
//在当前线程执行SocketProcessor的run方法
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

这个方法位于AbstractEndpoint抽象类中,这里的executor最初由NioEndpoint的生命周期钩子startInternal方法中创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 @Override
public void startInternal() throws Exception {
if (!running) {
//略.......
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
//初始化连接限制
initializeConnectionLatch();
// Start poller threads
//轮询器启动线程,都为守护线程
pollers = new Poller[getPollerThreadCount()];
for (int i = 0; i < pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//开始所有的Acceptor线程,用于监听套接字的
startAcceptorThreads();
}
}
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}

SocketProcessor实现了Runnable接口,所以本篇入口在此:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Override
public final void run() {
synchronized (socketWrapper) {
// It is possible that processing may be triggered for read and
// write at the same time. The sync above makes sure that processing
// does not occur in parallel. The test below ensures that if the
// first event to be processed results in the socket being closed,
// the subsequent events are not processed.
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
@Override
protected void doRun() {
//获取用户连接
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

try {
//https的握手行为
int handshake = -1;

try {
if (key != null) {
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake", x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
//握手完成,或者不需要握手时 handshake == 0
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
//默认是读事件处理
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
//相应指定事件
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ) {
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE) {
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error("", t);
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
//缓存
processorCache.push(this);
}
}
}
}

看下面这句代码:

1
2
 //相应指定事件
state = getHandler().process(socketWrapper, event);

getHandler()返回的是ConnectionHandler,实现了AbstractEndpoint.Handler接口,位于AbstractProtocol抽象类中。
调用其process方法,这个方法实在是太长了,我只简单的截取一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
//略......

S socket = wrapper.getSocket();

//略......
try {
//略......
//创建一个新的Http11Processor
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}

//略......
SocketState state = SocketState.CLOSED;
//只要状态为 SocketState.UPGRADING,一直循环
do {
//处理核心
state = processor.process(wrapper, status);
//略......

} while (state == SocketState.UPGRADING);

//略......
}


获取到Http11Processor,并调用其process方法,在其父类AbstractProcessorLight中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {

SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
//处理不在标准HTTP模式下的正在处理中的请求。
//目前使用的包括Servlet 3.0异步和HTTP升级连接
//将来可能会增加更多的用途。这些通常以HTTP请求开始。
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}

if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}

if (state != SocketState.CLOSED && isAsync()) {
state = asyncPostProcess();
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], State after async post processing: [" + state + "]");
}
}

if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);

return state;
}

调用子类Http11Processor的service方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@Override
public SocketState service(SocketWrapperBase<?> socketWrapper)
throws IOException {
//request:org.apache.coyote.Request
//结构,其中包含请求和响应对象。
RequestInfo rp = request.getRequestProcessor();
//解析状态
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

// Setting up the I/O
//初始化输入输出流
setSocketWrapper(socketWrapper);
inputBuffer.init(socketWrapper);
outputBuffer.init(socketWrapper);

// Flags
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;

//没有错误状态; keepAlive = true;同步的;非更新HTTP连接后使用的新协议的实例;文件处理完毕;没有被暂停
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !protocol.isPaused()) {

// Parsing the request header
//解析请求头
try {
if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),
protocol.getKeepAliveTimeout())) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}

if (protocol.isPaused()) {
//response:org.apache.coyote.Response
// 503 - Service unavailable
//服务无效;无法提供服务;找不到服务器
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
if (!inputBuffer.parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!protocol.getDisableUploadTimeout()) {
socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
}
}
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
break;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
UserDataHelper.Mode logMode = userDataHelper.getNextMode();
if (logMode != null) {
String message = sm.getString("http11processor.header.parse");
switch (logMode) {
case INFO_THEN_DEBUG:
message += sm.getString("http11processor.fallToDebug");
//$FALL-THROUGH$
case INFO:
log.info(message, t);
break;
case DEBUG:
log.debug(message, t);
}
}
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}

// Has an upgrade been requested?
//升级协议
Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
boolean foundUpgrade = false;
while (connectionValues.hasMoreElements() && !foundUpgrade) {
foundUpgrade = connectionValues.nextElement().toLowerCase(
Locale.ENGLISH).contains("upgrade");
}

if (foundUpgrade) {
// Check the protocol
String requestedProtocol = request.getHeader("Upgrade");

UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);
if (upgradeProtocol != null) {
if (upgradeProtocol.accept(request)) {
//101
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
response.setHeader("Connection", "Upgrade");
response.setHeader("Upgrade", requestedProtocol);
action(ActionCode.CLOSE, null);
getAdapter().log(request, response, 0);

InternalHttpUpgradeHandler upgradeHandler =
upgradeProtocol.getInternalUpgradeHandler(
socketWrapper, getAdapter(), cloneRequest(request));
UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
action(ActionCode.UPGRADE, upgradeToken);
return SocketState.UPGRADING;
}
}
}

if (getErrorState().isIoAllowed()) {
// Setting up filters, and parse some request headers
//准备阶段
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
//很长的方法,各种解析请求,组装request
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
}

int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}

// Process the request in the adapter
if (getErrorState().isIoAllowed()) {
try {
//服务处理阶段
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
//获取CoyoteAdapter,调用其service方法
getAdapter().service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if (keepAlive && !getErrorState().isError() && !isAsync() &&
statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
} catch (HeadersTooLargeException e) {
log.error(sm.getString("http11processor.request.process"), e);
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close"); // TODO: Remove
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}

// Finish the handling of the request
//完成请求的处理
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync()) {
// If this is an async request then the request ends when it has
// been completed. The AsyncContext is responsible for calling
// endRequest() in that case.
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}

if (!isAsync() || getErrorState().isError()) {
request.updateCounters();
if (getErrorState().isIoAllowed()) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
}

if (!protocol.getDisableUploadTimeout()) {
int connectionTimeout = protocol.getConnectionTimeout();
if (connectionTimeout > 0) {
socketWrapper.setReadTimeout(connectionTimeout);
} else {
socketWrapper.setReadTimeout(0);
}
}

//keepalive阶段
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

//如果需要,触发sendfile处理。
sendfileState = processSendfile(socketWrapper);
}

//结束阶段
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

if (getErrorState().isError() || protocol.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync()) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileState == SendfileState.PENDING) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}

巨无霸方法,主要是组装org.apache.coyote.Request与org.apache.coyote.Response,
这里着重关心业务处理阶段,即获取CoyoteAdapter,并调用其service方。这里的CoyoteAdapter是在初始化Connector的时候创建的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {

//org.apache.catalina.connector.Request,实现了javax.servlet.http.HttpServletRequest接口
Request request = (Request) req.getNote(ADAPTER_NOTES);
//org.apache.catalina.connector.Response,实现了javax.servlet.http.HttpServletResponse接口
Response response = (Response) res.getNote(ADAPTER_NOTES);

//准备阶段
if (request == null) {
//创建请求与响应
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);

// Link objects,关联关系确定
request.setResponse(response);
response.setRequest(request);

// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);

// Set query string encoding
//编码方式
req.getParameters().setQueryStringCharset(connector.getURICharset());
}

if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}

boolean async = false;
boolean postParseSuccess = false;

req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

try {
// Parse and set Catalina and configuration specific
// request parameters
//解析并设置Catalina和配置特定的请求参数
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
//如果我们支持异步止回阀
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
//调用容器
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
//如果是异步请求
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}

Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

// If an async request was started, is not going to end once
// this container thread finishes and an error occurred, trigger
// the async error process
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
//结束请求和响应
request.finishRequest();
response.finishResponse();
}

} catch (IOException e) {
// Ignore
} finally {
AtomicBoolean error = new AtomicBoolean(false);
res.action(ActionCode.IS_ERROR, error);

if (request.isAsyncCompleting() && error.get()) {
// Connection will be forcibly closed which will prevent
// completion happening at the usual point. Need to trigger
// call to onComplete() here.
res.action(ActionCode.ASYNC_POST_PROCESS, null);
async = false;
}

// Access log
if (!async && postParseSuccess) {
// Log only if processing was invoked.
// If postParseRequest() failed, it has already logged it.
Context context = request.getContext();
Host host = request.getHost();
// If the context is null, it is likely that the endpoint was
// shutdown, this connection closed and the request recycled in
// a different thread. That thread will have updated the access
// log so it is OK not to update the access log here in that
// case.
// The other possibility is that an error occurred early in
// processing and the request could not be mapped to a Context.
// Log via the host or engine in that case.
long time = System.currentTimeMillis() - req.getStartTime();
if (context != null) {
context.logAccess(request, response, time, false);
} else if (response.isError()) {
if (host != null) {
host.logAccess(request, response, time, false);
} else {
connector.getService().getContainer().logAccess(
request, response, time, false);
}
}
}

req.getRequestProcessor().setWorkerThreadName(null);

// Recycle the wrapper request and response
if (!async) {
updateWrapperErrorCount(request, response);
//重新利用
request.recycle();
response.recycle();
}
}
}

这里有两个重要方法,postParseRequest的作用是解析并设置Catalina和配置特定的请求参数。
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)的作用是调用容器,真正执行业务逻辑。
即工作线程终于组装好了request和response,可以调用到Servlet引擎中的Servlet方法了。

虽然本篇文章中设计的源码都特别的长,但是目的只有一个,组装request和response,调用容器。