先看一下NioServerSocketChannel的继承结构。
AttributeMap接口及DefaultAttributeMap主要是提供了体检属性和获取属性的能力,便于我们为Channel绑定额外的属性。
AbstractChannel实现了Channel接口,实现了Channel通用的行为和方法,我们在Netty学习系列四中已经介绍过了。
AbstractNioChannel抽象类关联了Channel接口与JDK的NIOChannel,也就是让底层的通信交给Nio来实现。
简单介绍下源码:
1 public abstract class AbstractNioChannel extends AbstractChannel {
2
3 private static final InternalLogger logger =
4 InternalLoggerFactory.getInstance(AbstractNioChannel.class);
5
6 private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
7 new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
8
9 //和Java NIO的Channel绑定
10 private final SelectableChannel ch;
11 //为SelectableChannel注册的时间
12 protected final int readInterestOp;
13 volatile SelectionKey selectionKey;
14
15 boolean readPending;
16
17 private final Runnable clearReadPendingRunnable = new Runnable() {
18 @Override
19 public void run() {
20 clearReadPending0();
21 }
22 };
23
24 /**
25 * The future of the current connection attempt. If not null, subsequent
26 * connection attempts will fail.
27 */
28 private ChannelPromise connectPromise;
29 private ScheduledFuture<?> connectTimeoutFuture;
30 private SocketAddress requestedRemoteAddress;
31
32 //构造函数,参数分别为父Channel,要封装的SelectableChannel和注册的感兴趣的事件
33 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
34 super(parent);
35 this.ch = ch;
36 this.readInterestOp = readInterestOp;
37 try {
38 //将SelectableChannel设置为非阻塞
39 ch.configureBlocking(false);
40 } catch (IOException e) {
41 try {
42 ch.close();
43 } catch (IOException e2) {
44 if (logger.isWarnEnabled()) {
45 logger.warn(
46 "Failed to close a partially initialized socket.", e2);
47 }
48 }
49
50 throw new ChannelException("Failed to enter non-blocking mode.", e);
51 }
52 }
53
54 //通道是否打开
55 @Override
56 public boolean isOpen() {
57 return ch.isOpen();
58 }
59
60 //返回更具体的Unsafe子类
61 @Override
62 public NioUnsafe unsafe() {
63 return (NioUnsafe) super.unsafe();
64 }
65
66 //返回内部封装的SelectableChannel
67 protected SelectableChannel javaChannel() {
68 return ch;
69 }
70
71 //返回EventLoop更具体的子类
72 @Override
73 public NioEventLoop eventLoop() {
74 return (NioEventLoop) super.eventLoop();
75 }
76
77 //返回SelectionKey
78 protected SelectionKey selectionKey() {
79 assert selectionKey != null;
80 return selectionKey;
81 }
82
83 //已废弃方法
84 @Deprecated
85 protected boolean isReadPending() {
86 return readPending;
87 }
88
89 //已废弃方法
90 @Deprecated
91 protected void setReadPending(final boolean readPending) {
92 if (isRegistered()) {
93 EventLoop eventLoop = eventLoop();
94 if (eventLoop.inEventLoop()) {
95 setReadPending0(readPending);
96 } else {
97 eventLoop.execute(new Runnable() {
98 @Override
99 public void run() {
100 setReadPending0(readPending);
101 }
102 });
103 }
104 } else {
105 // Best effort if we are not registered yet clear readPending.
106 // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
107 // not set yet so it would produce an assertion failure.
108 this.readPending = readPending;
109 }
110 }
111
112 /**
113 * Set read pending to {@code false}.
114 */
115 protected final void clearReadPending() {
116 if (isRegistered()) {
117 EventLoop eventLoop = eventLoop();
118 if (eventLoop.inEventLoop()) {
119 clearReadPending0();
120 } else {
121 eventLoop.execute(clearReadPendingRunnable);
122 }
123 } else {
124 // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
125 // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
126 // not set yet so it would produce an assertion failure.
127 readPending = false;
128 }
129 }
130
131 private void setReadPending0(boolean readPending) {
132 this.readPending = readPending;
133 if (!readPending) {
134 ((AbstractNioUnsafe) unsafe()).removeReadOp();
135 }
136 }
137
138 private void clearReadPending0() {
139 readPending = false;
140 ((AbstractNioUnsafe) unsafe()).removeReadOp();
141 }
142
143 //Unsafe的具体子类,增加了一些和NioChannel相关的特性
144 public interface NioUnsafe extends Unsafe {
145 //返回内部的SelectableChannel
146 SelectableChannel ch();
147
148 //连接完成
149 void finishConnect();
150
151 //读方法
152 void read();
153
154 //强制刷新
155 void forceFlush();
156 }
157
158 //NioUnsafe的抽象实现
159 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
160
161 protected final void removeReadOp() {
162 SelectionKey key = selectionKey();
163 // Check first if the key is still valid as it may be canceled as part of the deregistration
164 // from the EventLoop
165 // See https://github.com/netty/netty/issues/2104
166 if (!key.isValid()) {
167 return;
168 }
169 int interestOps = key.interestOps();
170 if ((interestOps & readInterestOp) != 0) {
171 // only remove readInterestOp if needed
172 key.interestOps(interestOps & ~readInterestOp);
173 }
174 }
175
176 //返回内部封装的Channel
177 @Override
178 public final SelectableChannel ch() {
179 return javaChannel();
180 }
181
182 //connect方法,实际在使用时NioServerSocket是不支持connect的,但是NioSocket会支持
183 @Override
184 public final void connect(
185 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
186 if (!promise.setUncancellable() || !ensureOpen(promise)) {
187 return;
188 }
189
190 try {
191 if (connectPromise != null) {
192 // Already a connect in process.
193 throw new ConnectionPendingException();
194 }
195
196 boolean wasActive = isActive();
197 //调用具体子类的doConnect方法
198 if (doConnect(remoteAddress, localAddress)) {
199 //连接成功设置fulfillConnectPromise
200 fulfillConnectPromise(promise, wasActive);
201 } else {
202 //连接未成功
203 connectPromise = promise;
204 requestedRemoteAddress = remoteAddress;
205
206 //根据配置的超时时间,设置超时任务,一旦到达超时时间则抛出连接失败的异常
207 int connectTimeoutMillis = config().getConnectTimeoutMillis();
208 if (connectTimeoutMillis > 0) {
209 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
210 @Override
211 public void run() {
212 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
213 ConnectTimeoutException cause =
214 new ConnectTimeoutException("connection timed out: " + remoteAddress);
215 if (connectPromise != null && connectPromise.tryFailure(cause)) {
216 close(voidPromise());
217 }
218 }
219 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
220 }
221
222 //添加监听器,如果期间操作成功了,则取消掉超超时任务
223 promise.addListener(new ChannelFutureListener() {
224 @Override
225 public void operationComplete(ChannelFuture future) throws Exception {
226 if (future.isCancelled()) {
227 if (connectTimeoutFuture != null) {
228 connectTimeoutFuture.cancel(false);
229 }
230 connectPromise = null;
231 close(voidPromise());
232 }
233 }
234 });
235 }
236 } catch (Throwable t) {
237 //运行出现异常,则设置Promise为失败
238 promise.tryFailure(annotateConnectException(t, remoteAddress));
239 closeIfClosed();
240 }
241 }
242
243 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
244 if (promise == null) {
245 // Closed via cancellation and the promise has been notified already.
246 return;
247 }
248
249 // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
250 // We still need to ensure we call fireChannelActive() in this case.
251 boolean active = isActive();
252
253 // trySuccess() will return false if a user cancelled the connection attempt.
254 boolean promiseSet = promise.trySuccess();
255
256
257 //active状态发生改变,现在已经连接成功
258 if (!wasActive && active) {
259 //pipeline产生Active事件在通道中流传
260 pipeline().fireChannelActive();
261 }
262
263 // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
264 if (!promiseSet) {
265 close(voidPromise());
266 }
267 }
268
269 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
270 if (promise == null) {
271 // Closed via cancellation and the promise has been notified already.
272 return;
273 }
274
275 // Use tryFailure() instead of setFailure() to avoid the race against cancel().
276 promise.tryFailure(cause);
277 closeIfClosed();
278 }
279
280 //连接完成,该方法会在连接成功后,由EventLoop调用
281 @Override
282 public final void finishConnect() {
283
284 assert eventLoop().inEventLoop();
285
286 try {
287 boolean wasActive = isActive();
288 doFinishConnect();
289 fulfillConnectPromise(connectPromise, wasActive);
290 } catch (Throwable t) {
291 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
292 } finally {
293 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
294 // See https://github.com/netty/netty/issues/1770
295 if (connectTimeoutFuture != null) {
296 connectTimeoutFuture.cancel(false);
297 }
298 connectPromise = null;
299 }
300 }
301
302 @Override
303 protected final void flush0() {
304 // Flush immediately only when there's no pending flush.
305 // If there's a pending flush operation, event loop will call forceFlush() later,
306 // and thus there's no need to call it now.
307 if (!isFlushPending()) {
308 super.flush0();
309 }
310 }
311
312 @Override
313 public final void forceFlush() {
314 // directly call super.flush0() to force a flush now
315 super.flush0();
316 }
317
318 private boolean isFlushPending() {
319 SelectionKey selectionKey = selectionKey();
320 return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
321 }
322 }
323
324 //判断EventLoop和Channel是否匹配
325 @Override
326 protected boolean isCompatible(EventLoop loop) {
327 return loop instanceof NioEventLoop;
328 }
329
330 //注册
331 @Override
332 protected void doRegister() throws Exception {
333 boolean selected = false;
334 for (;;) {
335 try {
336 //让内部的javaChannel先注册的interestOps为0
337 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
338 return;
339 } catch (CancelledKeyException e) {
340 if (!selected) {
341 // Force the Selector to select now as the "canceled" SelectionKey may still be
342 // cached and not removed because no Select.select(..) operation was called yet.
343 eventLoop().selectNow();
344 selected = true;
345 } else {
346 // We forced a select operation on the selector before but the SelectionKey is still cached
347 // for whatever reason. JDK bug ?
348 throw e;
349 }
350 }
351 }
352 }
353
354 @Override
355 protected void doDeregister() throws Exception {
356 eventLoop().cancel(selectionKey());
357 }
358
359 //doBeginRead由read方法调用
360 @Override
361 protected void doBeginRead() throws Exception {
362 final SelectionKey selectionKey = this.selectionKey;
363 if (!selectionKey.isValid()) {
364 return;
365 }
366
367 readPending = true;
368 //重新注册感兴趣的事件
369 final int interestOps = selectionKey.interestOps();
370 if ((interestOps & readInterestOp) == 0) {
371 selectionKey.interestOps(interestOps | readInterestOp);
372 }
373 }
374
375 /**
376 * Connect to the remote peer
377 */
378 protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
379
380 /**
381 * Finish the connect
382 */
383 protected abstract void doFinishConnect() throws Exception;
384
385 //分配直接内存
386 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
387 final int readableBytes = buf.readableBytes();
388 if (readableBytes == 0) {
389 ReferenceCountUtil.safeRelease(buf);
390 return Unpooled.EMPTY_BUFFER;
391 }
392
393 final ByteBufAllocator alloc = alloc();
394 if (alloc.isDirectBufferPooled()) {
395 ByteBuf directBuf = alloc.directBuffer(readableBytes);
396 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
397 ReferenceCountUtil.safeRelease(buf);
398 return directBuf;
399 }
400
401 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
402 if (directBuf != null) {
403 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
404 ReferenceCountUtil.safeRelease(buf);
405 return directBuf;
406 }
407
408 // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
409 return buf;
410 }
411
412 //分配直接内存
413 protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
414 final int readableBytes = buf.readableBytes();
415 if (readableBytes == 0) {
416 ReferenceCountUtil.safeRelease(holder);
417 return Unpooled.EMPTY_BUFFER;
418 }
419
420 final ByteBufAllocator alloc = alloc();
421 if (alloc.isDirectBufferPooled()) {
422 ByteBuf directBuf = alloc.directBuffer(readableBytes);
423 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
424 ReferenceCountUtil.safeRelease(holder);
425 return directBuf;
426 }
427
428 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
429 if (directBuf != null) {
430 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
431 ReferenceCountUtil.safeRelease(holder);
432 return directBuf;
433 }
434
435 // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
436 if (holder != buf) {
437 // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
438 buf.retain();
439 ReferenceCountUtil.safeRelease(holder);
440 }
441
442 return buf;
443 }
444
445 //关闭方法
446 @Override
447 protected void doClose() throws Exception {
448 ChannelPromise promise = connectPromise;
449 if (promise != null) {
450 // Use tryFailure() instead of setFailure() to avoid the race against cancel().
451 promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
452 connectPromise = null;
453 }
454
455 ScheduledFuture<?> future = connectTimeoutFuture;
456 if (future != null) {
457 future.cancel(false);
458 connectTimeoutFuture = null;
459 }
460 }
461 }
AbstractNioChannel又有两个子类,分别是AbstractNioMessageChannel和AbstractNioByteChannel。两者的区别是前者的通道中封装处理的是Object,而后者的通道中封装处理的是ByteBuf(或FileRegion)。
对于NioServerSocketChannel而言,需要处理的是NioSocketChannel。因此它集成了AbstractNioMessageChannel。
AbstractNioMessageChannel源码:
1 public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
2 boolean inputShutdown;
3
4 //构造函数
5 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
6 //设置父Channel, 内部封装的JDKchannel和注册interestOp
7 super(parent, ch, readInterestOp);
8 }
9
10 //返回Unsafe对象
11 @Override
12 protected AbstractNioUnsafe newUnsafe() {
13 return new NioMessageUnsafe();
14 }
15
16 //读
17 @Override
18 protected void doBeginRead() throws Exception {
19 if (inputShutdown) {
20 return;
21 }
22 super.doBeginRead();
23 }
24
25 //AbstractNioUnsafe对象的
26 private final class NioMessageUnsafe extends AbstractNioUnsafe {
27
28 private final List<Object> readBuf = new ArrayList<Object>();
29
30 @Override
31 public void read() {
32 assert eventLoop().inEventLoop();
33 final ChannelConfig config = config();
34 final ChannelPipeline pipeline = pipeline();
35 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
36 allocHandle.reset(config);
37
38 boolean closed = false;
39 Throwable exception = null;
40
41
42 try {
43 try {
44 //开始读操作,主要是调用子类的doReadMessages实现,从SelectableChannel中读取数据,并封装到readBuf
45 do {
46 int localRead = doReadMessages(readBuf);
47 if (localRead == 0) {
48 break;
49 }
50 if (localRead < 0) {
51 closed = true;
52 break;
53 }
54
55 allocHandle.incMessagesRead(localRead);
56 } while (allocHandle.continueReading());
57 } catch (Throwable t) {
58 exception = t;
59 }
60
61 //将读到的readBuf通过pipline,在通道中流通,便于被通道中的Handler处理
62 int size = readBuf.size();
63 for (int i = 0; i < size; i ++) {
64 readPending = false;
65 pipeline.fireChannelRead(readBuf.get(i));
66 }
67 //清空
68 readBuf.clear();
69 //读完成,产生readCompleate事件
70 allocHandle.readComplete();
71 pipeline.fireChannelReadComplete();
72
73 //如果有异常,则产生异常事件
74 if (exception != null) {
75 closed = closeOnReadError(exception);
76
77 pipeline.fireExceptionCaught(exception);
78 }
79
80 //如果被关闭,则调用关闭
81 if (closed) {
82 inputShutdown = true;
83 if (isOpen()) {
84 close(voidPromise());
85 }
86 }
87 } finally {
88 // Check if there is a readPending which was not processed yet.
89 // This could be for two reasons:
90 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
91 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
92 //
93 // See https://github.com/netty/netty/issues/2254
94 if (!readPending && !config.isAutoRead()) {
95 removeReadOp();
96 }
97 }
98 }
99 }
100
101
102
103 //写操作,NioServerSocketChannel不支持写
104 @Override
105 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
106 final SelectionKey key = selectionKey();
107 final int interestOps = key.interestOps();
108
109 for (;;) {
110 Object msg = in.current();
111 if (msg == null) {
112 //如果注册了写事件,则移除写事件
113 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
114 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
115 }
116 break;
117 }
118 try {
119 boolean done = false;
120 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
121 //具体的写操作交给子类实现(NioServerSocketChannel不支持写操作)
122 if (doWriteMessage(msg, in)) {
123 done = true;
124 break;
125 }
126 }
127
128 if (done) {
129 in.remove();
130 } else {
131 // Did not write all messages.
132 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
133 key.interestOps(interestOps | SelectionKey.OP_WRITE);
134 }
135 break;
136 }
137 } catch (Exception e) {
138 if (continueOnWriteError()) {
139 in.remove(e);
140 } else {
141 throw e;
142 }
143 }
144 }
145 }
146
147 /**
148 * Returns {@code true} if we should continue the write loop on a write error.
149 */
150 protected boolean continueOnWriteError() {
151 return false;
152 }
153
154
155 protected boolean closeOnReadError(Throwable cause) {
156 if (!isActive()) {
157 // If the channel is not active anymore for whatever reason we should not try to continue reading.
158 return true;
159 }
160 if (cause instanceof PortUnreachableException) {
161 return false;
162 }
163 if (cause instanceof IOException) {
164 // ServerChannel should not be closed even on IOException because it can often continue
165 // accepting incoming connections. (e.g. too many open files)
166 return !(this instanceof ServerChannel);
167 }
168 return true;
169 }
170
171 //读和写的具体操作交给子类去实现
172 protected abstract int doReadMessages(List<Object> buf) throws Exception;
173
174 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
175 }
最后来看NioServerSocketChannel源码:
1 public class NioServerSocketChannel extends AbstractNioMessageChannel
2 implements io.netty.channel.socket.ServerSocketChannel {
3
4 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
5 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
6
7 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
8
9 //产生NIOServerSocketChannel的方法
10 private static ServerSocketChannel newSocket(SelectorProvider provider) {
11 try {
12 /**
13 * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
14 * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
15 *
16 * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
17 */
18 return provider.openServerSocketChannel();
19 } catch (IOException e) {
20 throw new ChannelException(
21 "Failed to open a server socket.", e);
22 }
23 }
24
25 private final ServerSocketChannelConfig config;
26
27 //默认构造函数, ReflectivaChannelFactory利用反射创建Channel时,即是调用了这个方法
28 public NioServerSocketChannel() {
29 this(newSocket(DEFAULT_SELECTOR_PROVIDER));
30 }
31
32 /**
33 * Create a new instance using the given {@link SelectorProvider}.
34 */
35 public NioServerSocketChannel(SelectorProvider provider) {
36 this(newSocket(provider));
37 }
38
39 //将NIO中的ServerSocketChannel封装成Netty的NioServerSocketChannel
40 public NioServerSocketChannel(ServerSocketChannel channel) {
41 //调用父类的构造函数,注意设置了interestOps为OP_ACCEPT
42 super(null, channel, SelectionKey.OP_ACCEPT);
43 //创建配置
44 config = new NioServerSocketChannelConfig(this, javaChannel().socket());
45 }
46
47 //返回以太网地址
48 @Override
49 public InetSocketAddress localAddress() {
50 return (InetSocketAddress) super.localAddress();
51 }
52
53 //返回元数据信息
54 @Override
55 public ChannelMetadata metadata() {
56 return METADATA;
57 }
58
59 //返回配置
60 @Override
61 public ServerSocketChannelConfig config() {
62 return config;
63 }
64
65 //Channel是否活跃
66 @Override
67 public boolean isActive() {
68 //通过socket的bound状态来确定是否为active
69 return javaChannel().socket().isBound();
70 }
71
72 //返回远端地址,ServerSocketChannel没有对应的远端地址
73 @Override
74 public InetSocketAddress remoteAddress() {
75 return null;
76 }
77
78 //内部封装的JDK自带的Channel
79 @Override
80 protected ServerSocketChannel javaChannel() {
81 return (ServerSocketChannel) super.javaChannel();
82 }
83
84 @Override
85 protected SocketAddress localAddress0() {
86 return SocketUtils.localSocketAddress(javaChannel().socket());
87 }
88
89 //通过调用内部封装的JDK中的NIO channel来绑定地址
90 @Override
91 protected void doBind(SocketAddress localAddress) throws Exception {
92 if (PlatformDependent.javaVersion() >= 7) {
93 javaChannel().bind(localAddress, config.getBacklog());
94 } else {
95 javaChannel().socket().bind(localAddress, config.getBacklog());
96 }
97 }
98
99 //关闭通道
100 @Override
101 protected void doClose() throws Exception {
102 javaChannel().close();
103 }
104
105 //读消息
106 @Override
107 protected int doReadMessages(List<Object> buf) throws Exception {
108 //其实就是调用ServerSocketChannel的accept方法监听accept事件,返回SocketChannel
109 SocketChannel ch = SocketUtils.accept(javaChannel());
110
111 try {
112 //将JDK NIO中的channel封装成Netty的NioSocketChannel对象,添加进buf中,使其在Pipeline中传递
113 if (ch != null) {
114 buf.add(new NioSocketChannel(this, ch));
115 return 1;//返回数量
116 }
117 } catch (Throwable t) {
118 logger.warn("Failed to create a new channel from an accepted socket.", t);
119
120 try {
121 ch.close();
122 } catch (Throwable t2) {
123 logger.warn("Failed to close a socket.", t2);
124 }
125 }
126
127 return 0;
128 }
129
130 //NIOServerSocketChannel不支持的部分操作 返回null 或者 UnsuppotedOperationException异常
131 @Override
132 protected boolean doConnect(
133 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
134 throw new UnsupportedOperationException();
135 }
136
137 @Override
138 protected void doFinishConnect() throws Exception {
139 throw new UnsupportedOperationException();
140 }
141
142 @Override
143 protected SocketAddress remoteAddress0() {
144 return null;
145 }
146
147 @Override
148 protected void doDisconnect() throws Exception {
149 throw new UnsupportedOperationException();
150 }
151
152 @Override
153 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
154 throw new UnsupportedOperationException();
155 }
156
157 @Override
158 protected final Object filterOutboundMessage(Object msg) throws Exception {
159 throw new UnsupportedOperationException();
160 }
161
162 /********************************************************************/
163
164
165 private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
166 private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
167 super(channel, javaSocket);
168 }
169
170 @Override
171 protected void autoReadCleared() {
172 clearReadPending();
173 }
174 }
175
176 // Override just to to be able to call directly via unit tests.
177 @Override
178 protected boolean closeOnReadError(Throwable cause) {
179 return super.closeOnReadError(cause);
180 }
181 }
分析完源码,再来看看上一篇文章中提出的问题:
为什么一开始register中注册的interestOps值为0,而非OP_ACCEPT?又是何时会注册OP_ACCEPT呢?
首先我们通过分析NioServerSocketChannel的源码可以看到:
channelFactory会通过发射创建NioServerSocketChannel对象。而发射调用的构造函数中设置了readInterestOps的值为OP_ACCEPT。而在AbstractNioChannel的doBeginRead方法中又会将readInterestOps注册到channel。
根据方法名我们可以猜测在开始读之前,selectableChannel的interestOps会从0被改为OP_ACCEPT。
为了证实这点,我们需要弄清楚开始时register interestOps为0的时机和调用doBeginRead的时机。
首先注册interestOps为0是在AbstractNioChannel的doRegister方法中。我们知道这个方法发生在channel的注册阶段。
再看doBeginRead的函数调用:
之前已经介绍过了注册或者绑定成功后,会调用pipeline.fireChannelActive事件。此时的DefaultChannelPipeline除了传递channelActive事件之外,还会调用readIfAutoRead()。
这个方法会根据Config配置的AutoRead属性来决定是否调用read方法。
而这个属性默认是自动读的。于是就可以调用read方法,并最终为channel注册OP_ACCEPT事件。