引言

上一篇分析完Acceptor,将获取的Socket封装成NioChannel注册到Poller,在注册的过程中NioChannel会封装成PollerEvent。
本篇主要看PollerEvent是怎么处理的。

Poller类实现了Runnable接口,所以主要看其run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
* 后台线程将套接字添加到轮询器,检查轮询器是否触发事件,并在事件发生时将关联的套接字交给适当的处理器。
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
//没有关闭
if (!close) {
//处理轮询器事件队列中的事件。
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
//非阻塞,只要有通道就绪就立刻返回。
keyCount = selector.selectNow();
} else {
//阻塞到至少有一个通道在你注册的事件上就绪了,最长阻塞时间为selectorTimeout毫秒。
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
//关闭
if (close) {
//处理轮询器事件队列中的事件。
events();
//超时的处理
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("", x);
continue;
}
//either we timed out or we woke up, process events first
//要么超时,要么醒来,先处理事件
if (keyCount == 0) hasEvents = (hasEvents | events());

//获取每一个SelectionKey,并处理
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
//附件,里面有数据
NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
//处理数据
processKey(sk, attachment);
}
}//while

//process timeouts
timeout(keyCount, hasEvents);
}//while
// Poller销毁,计数器减一
getStopLatch().countDown();
}

主要就是获取队列中的事件,处理事件,这里看两个方法:

events

处理轮询器事件队列中的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Processes events in the event queue of the Poller.
* 处理轮询器事件队列中的事件。
*
* @return <code>true</code> if some events were processed,
* <code>false</code> if queue was empty
*/
public boolean events() {
boolean result = false;

PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
result = true;
try {
//PollerEvent也实现了Runnable接口
pe.run();
//重置
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch (Throwable x) {
log.error("", x);
}
}

return result;
}

继续看PollerEvent的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
public void run() {
//注册操作
if (interestOps == OP_REGISTER) {
try {
//注册Selector,读操作
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
((NioSocketWrapper) socket.socketWrapper).closed = true;
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
//设置interest值
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {
}
}
}
}

这一步可以说是为了后面处理数据做铺垫。

processKey(sk, attachment)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if (close) {
//取消键,里面各种关闭
cancelledKey(sk);
} else if (sk.isValid() && attachment != null) {
//有效且有数据
if (sk.isReadable() || sk.isWritable()) {
//有文件时
if (attachment.getSendfileData() != null) {
processSendfile(sk, attachment, false);
} else {
//取消注册
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
//处理可读
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
//可写
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("", t);
}
}

可以看出主要是调用processSocket方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Process the given SocketWrapper with the given status. Used to trigger
* processing as if the Poller (for those endpoints that have one)
* selected the socket.
*
* @param socketWrapper The socket wrapper to process
* @param event The socket event to be processed
* @param dispatch Should the processing be performed on a new
* container thread
*
* @return if processing was triggered successfully
*/
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

这里将attachment和SocketEvent封装成SocketProcessor,交给Executor(即工作线程)处理。
下一篇继续分析工作线程的操作流程。