引言

这个类其实比较简单,继承自Runnable接口,实现了run方法。
方法中主要都是一些状态的变化,这里只看比较重要的两个地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
public void run() {

int errorDelay = 0;

// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
//略。。。

try {
//略。。。

U socket = null;
try {
// Accept the next incoming connection from the server
// socket
//看这里看这里!!!!第一处
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;

// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
//看这里看这里!!!!第二处
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
//略。。。
}
}
state = AcceptorState.ENDED;
}

有些代码略过去了,这个方法也比较简单,循环获取SocketChannel,并将socket封装成NioChannel注册到poller。

endpoint.serverSocketAccept()

1
2
3
4
5
@Override
protected SocketChannel serverSocketAccept() throws Exception {
//ServerSocketChannel
return serverSock.accept();
}

这就是ServerSocketChannel中的方法了,获取SocketChannel。

endpoint.setSocketOptions(socket)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Process the specified connection.
*
* @param socket The socket channel
* @return <code>true</code> if the socket was correctly configured
* and processing may continue, <code>false</code> if the socket needs to be
* close immediately
*/
@Override
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);

NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("", t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}

这个方法首先获取Socket,在将Socket封装到NioChannel中,最后将NioChannel注册到Poller中。
分析获取Poller和注册。

getPoller0()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 /**
* Return an available poller in true round robin fashion.
*
* @return The next poller in sequence
*/
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
/**
* The socket pollers.
*/
private Poller[] pollers = null;
private AtomicInteger pollerRotater = new AtomicInteger(0);

获取一个Poller.

Poller的register方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 /**
* Registers a newly created socket with the poller.
*
* @param socket The newly created socket
*/
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if (r == null) r = new PollerEvent(socket, ka, OP_REGISTER);
else r.reset(socket, ka, OP_REGISTER);
addEvent(r);
}
private void addEvent(PollerEvent event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0) selector.wakeup();
}
private final SynchronizedQueue<PollerEvent> events =
new SynchronizedQueue<>();

最终将NioChannel传递到了Poller的register方法中,封装成PollerEvent,存入队列。
下一篇分析Poller是怎么处理PollerEvent。