/** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. * 后台线程将套接字添加到轮询器,检查轮询器是否触发事件,并在事件发生时将关联的套接字交给适当的处理器。 */ @Override publicvoidrun() { // Loop until destroy() is called while (true) { booleanhasEvents=false; try { //没有关闭 if (!close) { //处理轮询器事件队列中的事件。 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select //非阻塞,只要有通道就绪就立刻返回。 keyCount = selector.selectNow(); } else { //阻塞到至少有一个通道在你注册的事件上就绪了,最长阻塞时间为selectorTimeout毫秒。 keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } //关闭 if (close) { //处理轮询器事件队列中的事件。 events(); //超时的处理 timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("", x); continue; } //either we timed out or we woke up, process events first //要么超时,要么醒来,先处理事件 if (keyCount == 0) hasEvents = (hasEvents | events());
//获取每一个SelectionKey,并处理 Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKeysk= iterator.next(); //附件,里面有数据 NioSocketWrapperattachment= (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); //处理数据 processKey(sk, attachment); } }//while
/** * Processes events in the event queue of the Poller. * 处理轮询器事件队列中的事件。 * * @return <code>true</code> if some events were processed, * <code>false</code> if queue was empty */ publicbooleanevents() { booleanresult=false;
PollerEventpe=null; for (inti=0, size = events.size(); i < size && (pe = events.poll()) != null; i++) { result = true; try { //PollerEvent也实现了Runnable接口 pe.run(); //重置 pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch (Throwable x) { log.error("", x); } }
@Override publicvoidrun() { //注册操作 if (interestOps == OP_REGISTER) { try { //注册Selector,读操作 socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { finalSelectionKeykey= socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed. socket.socketWrapper.getEndpoint().countDownConnection(); ((NioSocketWrapper) socket.socketWrapper).closed = true; } else { finalNioSocketWrappersocketWrapper= (NioSocketWrapper) key.attachment(); if (socketWrapper != null) { //we are registering the key to start with, reset the fairness counter. intops= key.interestOps() | interestOps; socketWrapper.interestOps(ops); //设置interest值 key.interestOps(ops); } else { socket.getPoller().cancelledKey(key); } } } catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key); } catch (Exception ignore) { } } } }
/** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */ publicbooleanprocessSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { returnfalse; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executorexecutor= getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); returnfalse; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); returnfalse; } returntrue; }