我们在Netty学习系列五的最后提出了一些问题还没得到回答,今天来通过学习NioServerSocketChannel的源码来帮我们找到之前问题的答案。
先看一下NioServerSocketChannel的继承结构。
AttributeMap接口及DefaultAttributeMap主要是提供了体检属性和获取属性的能力,便于我们为Channel绑定额外的属性。
AbstractChannel实现了Channel接口,实现了Channel通用的行为和方法,我们在Netty学习系列四中已经介绍过了。
AbstractNioChannel抽象类关联了Channel接口与JDK的NIOChannel,也就是让底层的通信交给Nio来实现。
简单介绍下源码:
1 public abstract class AbstractNioChannel extends AbstractChannel { 2 3 private static final InternalLogger logger = 4 InternalLoggerFactory.getInstance(AbstractNioChannel.class); 5 6 private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( 7 new ClosedChannelException(), AbstractNioChannel.class, "doClose()"); 8 9 //和Java NIO的Channel绑定 10 private final SelectableChannel ch; 11 //为SelectableChannel注册的时间 12 protected final int readInterestOp; 13 volatile SelectionKey selectionKey; 14 15 boolean readPending; 16 17 private final Runnable clearReadPendingRunnable = new Runnable() { 18 @Override 19 public void run() { 20 clearReadPending0(); 21 } 22 }; 23 24 /** 25 * The future of the current connection attempt. If not null, subsequent 26 * connection attempts will fail. 27 */ 28 private ChannelPromise connectPromise; 29 private ScheduledFuture<?> connectTimeoutFuture; 30 private SocketAddress requestedRemoteAddress; 31 32 //构造函数,参数分别为父Channel,要封装的SelectableChannel和注册的感兴趣的事件 33 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { 34 super(parent); 35 this.ch = ch; 36 this.readInterestOp = readInterestOp; 37 try { 38 //将SelectableChannel设置为非阻塞 39 ch.configureBlocking(false); 40 } catch (IOException e) { 41 try { 42 ch.close(); 43 } catch (IOException e2) { 44 if (logger.isWarnEnabled()) { 45 logger.warn( 46 "Failed to close a partially initialized socket.", e2); 47 } 48 } 49 50 throw new ChannelException("Failed to enter non-blocking mode.", e); 51 } 52 } 53 54 //通道是否打开 55 @Override 56 public boolean isOpen() { 57 return ch.isOpen(); 58 } 59 60 //返回更具体的Unsafe子类 61 @Override 62 public NioUnsafe unsafe() { 63 return (NioUnsafe) super.unsafe(); 64 } 65 66 //返回内部封装的SelectableChannel 67 protected SelectableChannel javaChannel() { 68 return ch; 69 } 70 71 //返回EventLoop更具体的子类 72 @Override 73 public NioEventLoop eventLoop() { 74 return (NioEventLoop) super.eventLoop(); 75 } 76 77 //返回SelectionKey 78 protected SelectionKey selectionKey() { 79 assert selectionKey != null; 80 return selectionKey; 81 } 82 83 //已废弃方法 84 @Deprecated 85 protected boolean isReadPending() { 86 return readPending; 87 } 88 89 //已废弃方法 90 @Deprecated 91 protected void setReadPending(final boolean readPending) { 92 if (isRegistered()) { 93 EventLoop eventLoop = eventLoop(); 94 if (eventLoop.inEventLoop()) { 95 setReadPending0(readPending); 96 } else { 97 eventLoop.execute(new Runnable() { 98 @Override 99 public void run() { 100 setReadPending0(readPending); 101 } 102 }); 103 } 104 } else { 105 // Best effort if we are not registered yet clear readPending. 106 // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is 107 // not set yet so it would produce an assertion failure. 108 this.readPending = readPending; 109 } 110 } 111 112 /** 113 * Set read pending to {@code false}. 114 */ 115 protected final void clearReadPending() { 116 if (isRegistered()) { 117 EventLoop eventLoop = eventLoop(); 118 if (eventLoop.inEventLoop()) { 119 clearReadPending0(); 120 } else { 121 eventLoop.execute(clearReadPendingRunnable); 122 } 123 } else { 124 // Best effort if we are not registered yet clear readPending. This happens during channel initialization. 125 // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is 126 // not set yet so it would produce an assertion failure. 127 readPending = false; 128 } 129 } 130 131 private void setReadPending0(boolean readPending) { 132 this.readPending = readPending; 133 if (!readPending) { 134 ((AbstractNioUnsafe) unsafe()).removeReadOp(); 135 } 136 } 137 138 private void clearReadPending0() { 139 readPending = false; 140 ((AbstractNioUnsafe) unsafe()).removeReadOp(); 141 } 142 143 //Unsafe的具体子类,增加了一些和NioChannel相关的特性 144 public interface NioUnsafe extends Unsafe { 145 //返回内部的SelectableChannel 146 SelectableChannel ch(); 147 148 //连接完成 149 void finishConnect(); 150 151 //读方法 152 void read(); 153 154 //强制刷新 155 void forceFlush(); 156 } 157 158 //NioUnsafe的抽象实现 159 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { 160 161 protected final void removeReadOp() { 162 SelectionKey key = selectionKey(); 163 // Check first if the key is still valid as it may be canceled as part of the deregistration 164 // from the EventLoop 165 // See https://github.com/netty/netty/issues/2104 166 if (!key.isValid()) { 167 return; 168 } 169 int interestOps = key.interestOps(); 170 if ((interestOps & readInterestOp) != 0) { 171 // only remove readInterestOp if needed 172 key.interestOps(interestOps & ~readInterestOp); 173 } 174 } 175 176 //返回内部封装的Channel 177 @Override 178 public final SelectableChannel ch() { 179 return javaChannel(); 180 } 181 182 //connect方法,实际在使用时NioServerSocket是不支持connect的,但是NioSocket会支持 183 @Override 184 public final void connect( 185 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { 186 if (!promise.setUncancellable() || !ensureOpen(promise)) { 187 return; 188 } 189 190 try { 191 if (connectPromise != null) { 192 // Already a connect in process. 193 throw new ConnectionPendingException(); 194 } 195 196 boolean wasActive = isActive(); 197 //调用具体子类的doConnect方法 198 if (doConnect(remoteAddress, localAddress)) { 199 //连接成功设置fulfillConnectPromise 200 fulfillConnectPromise(promise, wasActive); 201 } else { 202 //连接未成功 203 connectPromise = promise; 204 requestedRemoteAddress = remoteAddress; 205 206 //根据配置的超时时间,设置超时任务,一旦到达超时时间则抛出连接失败的异常 207 int connectTimeoutMillis = config().getConnectTimeoutMillis(); 208 if (connectTimeoutMillis > 0) { 209 connectTimeoutFuture = eventLoop().schedule(new Runnable() { 210 @Override 211 public void run() { 212 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; 213 ConnectTimeoutException cause = 214 new ConnectTimeoutException("connection timed out: " + remoteAddress); 215 if (connectPromise != null && connectPromise.tryFailure(cause)) { 216 close(voidPromise()); 217 } 218 } 219 }, connectTimeoutMillis, TimeUnit.MILLISECONDS); 220 } 221 222 //添加监听器,如果期间操作成功了,则取消掉超超时任务 223 promise.addListener(new ChannelFutureListener() { 224 @Override 225 public void operationComplete(ChannelFuture future) throws Exception { 226 if (future.isCancelled()) { 227 if (connectTimeoutFuture != null) { 228 connectTimeoutFuture.cancel(false); 229 } 230 connectPromise = null; 231 close(voidPromise()); 232 } 233 } 234 }); 235 } 236 } catch (Throwable t) { 237 //运行出现异常,则设置Promise为失败 238 promise.tryFailure(annotateConnectException(t, remoteAddress)); 239 closeIfClosed(); 240 } 241 } 242 243 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { 244 if (promise == null) { 245 // Closed via cancellation and the promise has been notified already. 246 return; 247 } 248 249 // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. 250 // We still need to ensure we call fireChannelActive() in this case. 251 boolean active = isActive(); 252 253 // trySuccess() will return false if a user cancelled the connection attempt. 254 boolean promiseSet = promise.trySuccess(); 255 256 257 //active状态发生改变,现在已经连接成功 258 if (!wasActive && active) { 259 //pipeline产生Active事件在通道中流传 260 pipeline().fireChannelActive(); 261 } 262 263 // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). 264 if (!promiseSet) { 265 close(voidPromise()); 266 } 267 } 268 269 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { 270 if (promise == null) { 271 // Closed via cancellation and the promise has been notified already. 272 return; 273 } 274 275 // Use tryFailure() instead of setFailure() to avoid the race against cancel(). 276 promise.tryFailure(cause); 277 closeIfClosed(); 278 } 279 280 //连接完成,该方法会在连接成功后,由EventLoop调用 281 @Override 282 public final void finishConnect() { 283 284 assert eventLoop().inEventLoop(); 285 286 try { 287 boolean wasActive = isActive(); 288 doFinishConnect(); 289 fulfillConnectPromise(connectPromise, wasActive); 290 } catch (Throwable t) { 291 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); 292 } finally { 293 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used 294 // See https://github.com/netty/netty/issues/1770 295 if (connectTimeoutFuture != null) { 296 connectTimeoutFuture.cancel(false); 297 } 298 connectPromise = null; 299 } 300 } 301 302 @Override 303 protected final void flush0() { 304 // Flush immediately only when there's no pending flush. 305 // If there's a pending flush operation, event loop will call forceFlush() later, 306 // and thus there's no need to call it now. 307 if (!isFlushPending()) { 308 super.flush0(); 309 } 310 } 311 312 @Override 313 public final void forceFlush() { 314 // directly call super.flush0() to force a flush now 315 super.flush0(); 316 } 317 318 private boolean isFlushPending() { 319 SelectionKey selectionKey = selectionKey(); 320 return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; 321 } 322 } 323 324 //判断EventLoop和Channel是否匹配 325 @Override 326 protected boolean isCompatible(EventLoop loop) { 327 return loop instanceof NioEventLoop; 328 } 329 330 //注册 331 @Override 332 protected void doRegister() throws Exception { 333 boolean selected = false; 334 for (;;) { 335 try { 336 //让内部的javaChannel先注册的interestOps为0 337 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 338 return; 339 } catch (CancelledKeyException e) { 340 if (!selected) { 341 // Force the Selector to select now as the "canceled" SelectionKey may still be 342 // cached and not removed because no Select.select(..) operation was called yet. 343 eventLoop().selectNow(); 344 selected = true; 345 } else { 346 // We forced a select operation on the selector before but the SelectionKey is still cached 347 // for whatever reason. JDK bug ? 348 throw e; 349 } 350 } 351 } 352 } 353 354 @Override 355 protected void doDeregister() throws Exception { 356 eventLoop().cancel(selectionKey()); 357 } 358 359 //doBeginRead由read方法调用 360 @Override 361 protected void doBeginRead() throws Exception { 362 final SelectionKey selectionKey = this.selectionKey; 363 if (!selectionKey.isValid()) { 364 return; 365 } 366 367 readPending = true; 368 //重新注册感兴趣的事件 369 final int interestOps = selectionKey.interestOps(); 370 if ((interestOps & readInterestOp) == 0) { 371 selectionKey.interestOps(interestOps | readInterestOp); 372 } 373 } 374 375 /** 376 * Connect to the remote peer 377 */ 378 protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; 379 380 /** 381 * Finish the connect 382 */ 383 protected abstract void doFinishConnect() throws Exception; 384 385 //分配直接内存 386 protected final ByteBuf newDirectBuffer(ByteBuf buf) { 387 final int readableBytes = buf.readableBytes(); 388 if (readableBytes == 0) { 389 ReferenceCountUtil.safeRelease(buf); 390 return Unpooled.EMPTY_BUFFER; 391 } 392 393 final ByteBufAllocator alloc = alloc(); 394 if (alloc.isDirectBufferPooled()) { 395 ByteBuf directBuf = alloc.directBuffer(readableBytes); 396 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); 397 ReferenceCountUtil.safeRelease(buf); 398 return directBuf; 399 } 400 401 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); 402 if (directBuf != null) { 403 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); 404 ReferenceCountUtil.safeRelease(buf); 405 return directBuf; 406 } 407 408 // Allocating and deallocating an unpooled direct buffer is very expensive; give up. 409 return buf; 410 } 411 412 //分配直接内存 413 protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) { 414 final int readableBytes = buf.readableBytes(); 415 if (readableBytes == 0) { 416 ReferenceCountUtil.safeRelease(holder); 417 return Unpooled.EMPTY_BUFFER; 418 } 419 420 final ByteBufAllocator alloc = alloc(); 421 if (alloc.isDirectBufferPooled()) { 422 ByteBuf directBuf = alloc.directBuffer(readableBytes); 423 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); 424 ReferenceCountUtil.safeRelease(holder); 425 return directBuf; 426 } 427 428 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); 429 if (directBuf != null) { 430 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); 431 ReferenceCountUtil.safeRelease(holder); 432 return directBuf; 433 } 434 435 // Allocating and deallocating an unpooled direct buffer is very expensive; give up. 436 if (holder != buf) { 437 // Ensure to call holder.release() to give the holder a chance to release other resources than its content. 438 buf.retain(); 439 ReferenceCountUtil.safeRelease(holder); 440 } 441 442 return buf; 443 } 444 445 //关闭方法 446 @Override 447 protected void doClose() throws Exception { 448 ChannelPromise promise = connectPromise; 449 if (promise != null) { 450 // Use tryFailure() instead of setFailure() to avoid the race against cancel(). 451 promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); 452 connectPromise = null; 453 } 454 455 ScheduledFuture<?> future = connectTimeoutFuture; 456 if (future != null) { 457 future.cancel(false); 458 connectTimeoutFuture = null; 459 } 460 } 461 }