https://www.cnblogs.com/chenyangyao/p/5795100.html
一、首先来看一段服务端的示例代码:
1 public class NettyTestServer {
2 public void bind(int port) throws Exception{
3 EventLoopGroup bossgroup = new NioEventLoopGroup();//创建BOSS线程组
4 EventLoopGroup workgroup = new NioEventLoopGroup();//创建WORK线程组
5 try{
6 ServerBootstrap b = new ServerBootstrap();
7 b.group(bossgroup,workgroup)//绑定BOSS和WORK线程组
8 .channel(NioServerSocketChannel.class)//设置channel类型,服务端用的是NioServerSocketChannel
9 .option(ChannelOption.SO_BACKLOG,100) //设置channel的配置选项
10 .handler(new LoggingHandler(LogLevel.INFO))//设置NioServerSocketChannel的Handler
11 .childHandler(new ChannelInitializer<SocketChannel>() {//设置childHandler,作为新建的NioSocketChannel的初始化Handler
12 @Override//当新建的与客户端通信的NioSocketChannel被注册到EventLoop成功时,该方法会被调用,用于添加业务Handler
13 protected void initChannel(SocketChannel ch) throws Exception {
14 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
15 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
16 ch.pipeline().addLast(new StringDecoder());
17 ch.pipeline().addLast(new EchoServerHandler());
18 }
19 });
20 ChannelFuture f = b.bind(port).sync();//同步等待绑定结束
21 f.channel().closeFuture().sync();//同步等待关闭
22 }finally {
23 bossgroup.shutdownGracefully();
24 workgroup.shutdownGracefully();
25 }
26 }
27 public static void main(String[] args) throws Exception{
28 int port = 8082;
29 new NettyTestServer().bind(port);
30 }
31 }
32 @ChannelHandler.Sharable
33 class EchoServerHandler extends ChannelInboundHandlerAdapter{
34 int count = 0;
35
36 @Override
37 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
38 String body = (String)msg;
39 System.out.println("This is" + ++count + "times receive client:[" + body + "]");
40 body += "$_";
41 ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
42 ctx.writeAndFlush(echo);
43 ctx.fireChannelRead("my name is chenyang");
44 }
45
46 @Override
47 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
48 cause.printStackTrace();
49 ctx.close();
50 }
51 }
二、首先来看一下ServerBootstrap类,顾名思义,它是一个服务端启动类,用于帮助用户快速配置、启动服务端服务。先来看一下该类的主要成员定义:
1 /**
2 * {@link Bootstrap} sub-class which allows easy bootstrap of {@link ServerChannel}
3 *
4 */
5 public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
6
7 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
8 //以下都是针对NioSocketChannel的
9 private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
10 private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
11 private volatile EventLoopGroup childGroup;
12 private volatile ChannelHandler childHandler;
可见,ServerBootstrap是AbstractBootstrap的子类,AbstractBootstrap的成员主要有:
1 /**
2 * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
3 * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
4 *
5 * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
6 * transports such as datagram (UDP).</p>
7 */
8 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
9 //以下都是针对服务端NioServerSocketChannel的
10 volatile EventLoopGroup group;
11 private volatile ChannelFactory<? extends C> channelFactory;
12 private volatile SocketAddress localAddress;
13 private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
14 private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
15 private volatile ChannelHandler handler;
用一张图说明两个类之间的关系如下(原图出自:http://blog.csdn.net/zxhoo/article/details/17532857)。
总结如下: ServerBootstrap比AbstractBootstrap多了4个Part,其中AbstractBootstrap的成员用于设置服务端NioServerSocketChannel(包括所使用的线程组、使用的channel工厂类、使用的Handler以及地址和选项信息等), ServerBootstrap的4个成员用于设置为有新连接时新建的NioSocketChannel。
三、ServerBootstrap配置源码解释
1)b.group(bossgroup,workgroup)
1 /**
2 * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
3 * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
4 * {@link Channel}'s.
5 */
6 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
7 super.group(parentGroup);//设置BOSS线程组(在AbstractBootstrap中)
8 if (childGroup == null) {
9 throw new NullPointerException("childGroup");
10 }
11 if (this.childGroup != null) {
12 throw new IllegalStateException("childGroup set already");
13 }
14 this.childGroup = childGroup;//设置WORK线程组
15 return this;
16 }
2) .channel(NioServerSocketChannel.class)
1 /**
2 * The {@link Class} which is used to create {@link Channel} instances from.
3 * You either use this or {@link #channelFactory(ChannelFactory)} if your
4 * {@link Channel} implementation has no no-args constructor.
5 */
6 public B channel(Class<? extends C> channelClass) {
7 if (channelClass == null) {
8 throw new NullPointerException("channelClass");
9 }
10 return channelFactory(new BootstrapChannelFactory<C>(channelClass));//设置channel工厂
11 }
channelFactory方法就是用来设置channel工厂的,这里的工厂就是BootstrapChannelFactory(是一个泛型类)。
1 /**
2 * {@link ChannelFactory} which is used to create {@link Channel} instances from
3 * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
4 * is not working for you because of some more complex needs. If your {@link Channel} implementation
5 * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
6 * simplify your code.
7 */
8 @SuppressWarnings("unchecked")
9 public B channelFactory(ChannelFactory<? extends C> channelFactory) {
10 if (channelFactory == null) {
11 throw new NullPointerException("channelFactory");
12 }
13 if (this.channelFactory != null) {
14 throw new IllegalStateException("channelFactory set already");
15 }
16
17 this.channelFactory = channelFactory;//设置channel工厂
18 return (B) this;
19 }
下面就是channel工厂类的实现,构造函数传入一个channel类型(针对服务端也就是NioServerSocketChannel.class),BootstrapChannelFactory工厂类提供的newChannel方法将使用反射创建对应的channel。用于channel的创建一般只在启动的时候进行,因此使用反射不会造成性能的问题。
1 private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
2 private final Class<? extends T> clazz;
3
4 BootstrapChannelFactory(Class<? extends T> clazz) {
5 this.clazz = clazz;
6 }
7
8 @Override
9 public T newChannel() {//需要创建channel的时候,次方法将被调用
10 try {
11 return clazz.newInstance();//反射创建对应channel
12 } catch (Throwable t) {
13 throw new ChannelException("Unable to create Channel from class " + clazz, t);
14 }
15 }
16
17 @Override
18 public String toString() {
19 return StringUtil.simpleClassName(clazz) + ".class";
20 }
21 }
3) .option(ChannelOption.SO_BACKLOG,100)
用来设置channel的选项,比如设置BackLog的大小等。
1 /**
2 * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
3 * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
4 */
5 @SuppressWarnings("unchecked")
6 public <T> B option(ChannelOption<T> option, T value) {
7 if (option == null) {
8 throw new NullPointerException("option");
9 }
10 if (value == null) {
11 synchronized (options) {
12 options.remove(option);
13 }
14 } else {
15 synchronized (options) {
16 options.put(option, value);
17 }
18 }
19 return (B) this;
20 }
4) .handler(new LoggingHandler(LogLevel.INFO))
用于设置服务端NioServerSocketChannel的Handler。
1 /**
2 * the {@link ChannelHandler} to use for serving the requests.
3 */
4 @SuppressWarnings("unchecked")
5 public B handler(ChannelHandler handler) {
6 if (handler == null) {
7 throw new NullPointerException("handler");
8 }
9 this.handler = handler;//设置的是父类AbstractBootstrap里的成员,也就是该handler是被NioServerSocketChannel使用
10 return (B) this;
11 }
5) .childHandler(new ChannelInitializer<SocketChannel>() {
一定要分清.handler和.childHandler的区别,首先,两者都是设置一个Handler,但是,前者设置的Handler是属于服务端NioServerSocketChannel的,而后者设置的Handler是属于每一个新建的NioSocketChannel的(每当有一个来自客户端的连接时,否会创建一个新的NioSocketChannel)。
1 /**
2 * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
3 */
4 public ServerBootstrap childHandler(ChannelHandler childHandler) {
5 if (childHandler == null) {
6 throw new NullPointerException("childHandler");
7 }
8 this.childHandler = childHandler;
9 return this;
10 }
至此,ServerBootstrap的配置完成,其实有人可能会很好奇,为什么不直接在ServerBootstrap的构造函数中一步完成这些初始化配置操作,这样做虽然可以,但是这会导致ServerBootstrap构造函数的参数过多,而是用Builder模式(也就是ServerBootstrap目前采用的模式,可以参见<<effective java>>)则可以有效的解决构造方法参数过多的问题。
四、bind流程
1)一切从bind开始 ChannelFuture f = b.bind(port).sync();
1 /**
2 * Create a new {@link Channel} and bind it.
3 */
4 public ChannelFuture bind(int inetPort) {
5 return bind(new InetSocketAddress(inetPort));
6 }
继续深入bind
1 /**
2 * Create a new {@link Channel} and bind it.
3 */
4 public ChannelFuture bind(SocketAddress localAddress) {
5 validate();
6 if (localAddress == null) {
7 throw new NullPointerException("localAddress");
8 }
9 return doBind(localAddress);
10 }
继续摄入doBind
1 private ChannelFuture doBind(final SocketAddress localAddress) {
2 final ChannelFuture regFuture = initAndRegister();//初始化并注册一个channel
3 final Channel channel = regFuture.channel();
4 if (regFuture.cause() != null) {
5 return regFuture;
6 }
7 //等待注册成功
8 if (regFuture.isDone()) {
9 // At this point we know that the registration was complete and successful.
10 ChannelPromise promise = channel.newPromise();
11 doBind0(regFuture, channel, localAddress, promise);//执行channel.bind()
12 return promise;
13 } else {
14 // Registration future is almost always fulfilled already, but just in case it's not.
15 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
16 regFuture.addListener(new ChannelFutureListener() {
17 @Override
18 public void operationComplete(ChannelFuture future) throws Exception {
19 Throwable cause = future.cause();
20 if (cause != null) {
21 // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
22 // IllegalStateException once we try to access the EventLoop of the Channel.
23 promise.setFailure(cause);
24 } else {
25 // Registration was successful, so set the correct executor to use.
26 // See https://github.com/netty/netty/issues/2586
27 promise.executor = channel.eventLoop();
28 }
29 doBind0(regFuture, channel, localAddress, promise);
30 }
31 });
32 return promise;
33 }
34 }
doBind中最重要的一步就是调用initAndRegister方法了,它会初始化并注册一个channel,直接看源码吧。
1 final ChannelFuture initAndRegister() {
2 final Channel channel = channelFactory().newChannel();//还记得前面我们设置过channel工厂么,终于排上用场了
3 try {
4 init(channel);//初始化channel(就是NioServerSocketChannel)
5 } catch (Throwable t) {
6 channel.unsafe().closeForcibly();
7 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
8 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
9 }
10
11 ChannelFuture regFuture = group().register(channel);//向EventLoopGroup中注册一个channel
12 if (regFuture.cause() != null) {
13 if (channel.isRegistered()) {
14 channel.close();
15 } else {
16 channel.unsafe().closeForcibly();
17 }
18 }
19
20 // If we are here and the promise is not failed, it's one of the following cases:
21 // 1) If we attempted registration from the event loop, the registration has been completed at this point.
22 // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
23 // 2) If we attempted registration from the other thread, the registration request has been successfully
24 // added to the event loop's task queue for later execution.
25 // i.e. It's safe to attempt bind() or connect() now:
26 // because bind() or connect() will be executed *after* the scheduled registration task is executed
27 // because register(), bind(), and connect() are all bound to the same thread.
28
29 return regFuture;
30 }
先来看一下init方法
1 @Override
2 void init(Channel channel) throws Exception {
3 final Map<ChannelOption<?>, Object> options = options();
4 synchronized (options) {
5 channel.config().setOptions(options);//设置之前配置的channel选项
6 }
7
8 final Map<AttributeKey<?>, Object> attrs = attrs();
9 synchronized (attrs) {
10 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
11 @SuppressWarnings("unchecked")
12 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
13 channel.attr(key).set(e.getValue());//设置之前配置的属性
14 }
15 }
16
17 ChannelPipeline p = channel.pipeline();//获取channel绑定的pipeline(pipeline实在channel创建的时候创建并绑定的)
18 if (handler() != null) {//如果用户配置过Handler
19 p.addLast(handler());//为NioServerSocketChannel绑定的pipeline添加Handler
20 }
21 //开始准备child用到的4个part,因为接下来就要使用它们。
22 final EventLoopGroup currentChildGroup = childGroup;
23 final ChannelHandler currentChildHandler = childHandler;
24 final Entry<ChannelOption<?>, Object>[] currentChildOptions;
25 final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
26 synchronized (childOptions) {
27 currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
28 }
29 synchronized (childAttrs) {
30 currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
31 }
32 //为NioServerSocketChannel的pipeline添加一个初始化Handler,当NioServerSocketChannel在EventLoop注册成功时,该handler的init方法将被调用
33 p.addLast(new ChannelInitializer<Channel>() {
34 @Override
35 public void initChannel(Channel ch) throws Exception {
36 ch.pipeline().addLast(new ServerBootstrapAcceptor(//为NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor处理器
//该Handler主要用来将新创建的NioSocketChannel注册到EventLoopGroup中
37 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
38 }
39 });
40 }
init执行之后,接下来看一**册过程(ChannelFuture regFuture = group().register(channel); 注意,这里的group是之前设置的BOSS EventLoopGroup)
1 @Override
2 public ChannelFuture register(Channel channel) {
3 return next().register(channel);//首先使用next()在BOSS EventLoopGroup中选出下一个EventLoop,然后执行注册
4 }
1 @Override
2 public ChannelFuture register(Channel channel) {
3 return register(channel, new DefaultChannelPromise(channel, this));
4 }
1 @Override
2 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
3 if (channel == null) {
4 throw new NullPointerException("channel");
5 }
6 if (promise == null) {
7 throw new NullPointerException("promise");
8 }
9
10 channel.unsafe().register(this, promise);//unsafe执行的都是实际的操作
11 return promise;
12 }
1 @Override
2 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
3 if (eventLoop == null) {
4 throw new NullPointerException("eventLoop");
5 }
6 if (isRegistered()) {
7 promise.setFailure(new IllegalStateException("registered to an event loop already"));
8 return;
9 }
10 if (!isCompatible(eventLoop)) {
11 promise.setFailure(
12 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
13 return;
14 }
15
16 AbstractChannel.this.eventLoop = eventLoop;//绑定为该channel选的的EventLoop
17 //必须保证注册是由该EventLoop发起的,否则会单独封装成一个Task,由该EventLoop执行
18 if (eventLoop.inEventLoop()) {
19 register0(promise);//注册
20 } else {
21 try {
22 eventLoop.execute(new OneTimeTask() {
23 @Override
24 public void run() {
25 register0(promise);
26 }
27 });
28 } catch (Throwable t) {
29 logger.warn(
30 "Force-closing a channel whose registration task was not accepted by an event loop: {}",
31 AbstractChannel.this, t);
32 closeForcibly();
33 closeFuture.setClosed();
34 safeSetFailure(promise, t);
35 }
36 }
37 }
1 private void register0(ChannelPromise promise) {
2 try {
3 // check if the channel is still open as it could be closed in the mean time when the register
4 // call was outside of the eventLoop
5 if (!promise.setUncancellable() || !ensureOpen(promise)) {
6 return;
7 }
8 boolean firstRegistration = neverRegistered;
9 doRegister();//最底层的注册调用
10 neverRegistered = false;
11 registered = true;
12 safeSetSuccess(promise);//设置注册结果为成功
13 pipeline.fireChannelRegistered();//发起pipeline调用fireChannelRegistered(head.fireChannelRegistered)
14 // Only fire a channelActive if the channel has never been registered. This prevents firing
15 // multiple channel actives if the channel is deregistered and re-registered.
16 if (firstRegistration && isActive()) {//如果是首次注册,而且channel已经处于Active状态(如果是服务端,表示listen成功,如果是客户端,便是connect成功)
17 pipeline.fireChannelActive();//发起pipeline的fireChannelActive
18 }
19 } catch (Throwable t) {
20 // Close the channel directly to avoid FD leak.
21 closeForcibly();
22 closeFuture.setClosed();
23 safeSetFailure(promise, t);
24 }
25 }
doRegister会完成在EventLoop的Selector上的注册任务。
1 @Override
2 protected void doRegister() throws Exception {
3 boolean selected = false;
4 for (;;) {
5 try {
6 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,此时op位为0,channel还不能监听读写事件
7 return;
8 } catch (CancelledKeyException e) {
9 if (!selected) {
10 // Force the Selector to select now as the "canceled" SelectionKey may still be
11 // cached and not removed because no Select.select(..) operation was called yet.
12 eventLoop().selectNow();
13 selected = true;
14 } else {
15 // We forced a select operation on the selector before but the SelectionKey is still cached
16 // for whatever reason. JDK bug ?
17 throw e;
18 }
19 }
20 }
21 }
由上可知,注册成功后,NioServerSocketChannel还不能监听读写事件,那么什么时候回开始监听呢?由于注册成功之后,会进行pipeline.fireChannelRegistered()调用,该事件会在NioServerSocketChannel的pipeline中传播(从head开始,逐步findContextInbound),这会导致Inbound类型的Handler的channelRegistered方法被调用。还记得在init方法中为NioServerSocketChannel添加的ChannelInitializer的Handler吗,它也是一个InboundHandler,看一下他的实现:
1 @Sharable
2 public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
3
4 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
5
6 /**
7 * This method will be called once the {@link Channel} was registered. After the method returns this instance
8 * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
9 *
10 * @param ch the {@link Channel} which was registered.
11 * @throws Exception is thrown if an error occurs. In that case the {@link Channel} will be closed.
12 */
13 protected abstract void initChannel(C ch) throws Exception;//抽象方法,由子类实现
14
15 @Override
16 @SuppressWarnings("unchecked")
17 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {//该方法会在NioServerScoketChannel注册成功时被调用
18 ChannelPipeline pipeline = ctx.pipeline();
19 boolean success = false;
20 try {
21 initChannel((C) ctx.channel());//调用initChannel
22 pipeline.remove(this);//初始化Handler只完成初始化工作,初始化完成自后就把自己删除
23 ctx.fireChannelRegistered();//继续传播channelRegistered事件
24 success = true;
25 } catch (Throwable t) {
26 logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
27 } finally {
28 if (pipeline.context(this) != null) {
29 pipeline.remove(this);
30 }
31 if (!success) {
32 ctx.close();
33 }
34 }
35 }
36 }
在重复贴一次代码,看一下initChannel里面是什么
1 p.addLast(new ChannelInitializer<Channel>() {
2 @Override
3 public void initChannel(Channel ch) throws Exception {//被channelRegistered调用
4 ch.pipeline().addLast(new ServerBootstrapAcceptor(
5 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
6 }
7 }
可以看到,initChannel只是向pipeline中添加了ServerBootstrapAcceptor类型的Handler。
但是这还是没有看到给NioServerSocketChannel注册读写事件的地方,继续看之前的register0代码,它还会调用pipleline的fireChannelActive方法,看一下该方方法的代码:
1 @Override
2 public ChannelPipeline fireChannelActive() {
3 head.fireChannelActive();//将ChannelActive事件在pipeline中传播
4 //如果channel被配置成自动可读的,那么久发起读事件
5 if (channel.config().isAutoRead()) {
6 channel.read();//pipeline.read()-->tail.read()-->*****-->head.read()-->unsafe.beginRead()
7 }
8
9 return this;
10 }
1 @Override
2 public ChannelHandlerContext fireChannelActive() {//head的fireChannelActive()
3 final AbstractChannelHandlerContext next = findContextInbound();//寻找下一个Inbound类型的Context
4 EventExecutor executor = next.executor();
5 if (executor.inEventLoop()) {
6 next.invokeChannelActive();//调用Context中的Handler的channelActive方法
7 } else {
8 executor.execute(new OneTimeTask() {
9 @Override
10 public void run() {
11 next.invokeChannelActive();
12 }
13 });
14 }
15 return this;
16 }
看一下beginRead实现:
1 @Override
2 public final void beginRead() {
3 if (!isActive()) {
4 return;
5 }
6
7 try {
8 doBeginRead();//真正的注册读事件
9 } catch (final Exception e) {
10 invokeLater(new OneTimeTask() {
11 @Override
12 public void run() {
13 pipeline.fireExceptionCaught(e);
14 }
15 });
16 close(voidPromise());
17 }
18 }
1 @Override
2 protected void doBeginRead() throws Exception {
3 // Channel.read() or ChannelHandlerContext.read() was called
4 if (inputShutdown) {
5 return;
6 }
7
8 final SelectionKey selectionKey = this.selectionKey;
9 if (!selectionKey.isValid()) {
10 return;
11 }
12
13 readPending = true;
14
15 final int interestOps = selectionKey.interestOps();
16 if ((interestOps & readInterestOp) == 0) {
17 selectionKey.interestOps(interestOps | readInterestOp);//真正的注册读事件
18 }
19 }
五、客户端接入过程
接下来看看,当一个客户端连接进来时,都发生了什么。
1)首先从事件的源头看起,下面是EventLoop的事件循环
1 @Override
2 protected void run() {
3 for (;;) {
4 boolean oldWakenUp = wakenUp.getAndSet(false);
5 try {
6 if (hasTasks()) {
7 selectNow();
8 } else {
9 select(oldWakenUp);//调用selector.select()
10
11 // 'wakenUp.compareAndSet(false, true)' is always evaluated
12 // before calling 'selector.wakeup()' to reduce the wake-up
13 // overhead. (Selector.wakeup() is an expensive operation.)
14 //
15 // However, there is a race condition in this approach.
16 // The race condition is triggered when 'wakenUp' is set to
17 // true too early.
18 //
19 // 'wakenUp' is set to true too early if:
20 // 1) Selector is waken up between 'wakenUp.set(false)' and
21 // 'selector.select(...)'. (BAD)
22 // 2) Selector is waken up between 'selector.select(...)' and
23 // 'if (wakenUp.get()) { ... }'. (OK)
24 //
25 // In the first case, 'wakenUp' is set to true and the
26 // following 'selector.select(...)' will wake up immediately.
27 // Until 'wakenUp' is set to false again in the next round,
28 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
29 // any attempt to wake up the Selector will fail, too, causing
30 // the following 'selector.select(...)' call to block
31 // unnecessarily.
32 //
33 // To fix this problem, we wake up the selector again if wakenUp
34 // is true immediately after selector.select(...).
35 // It is inefficient in that it wakes up the selector for both
36 // the first case (BAD - wake-up required) and the second case
37 // (OK - no wake-up required).
38
39 if (wakenUp.get()) {
40 selector.wakeup();
41 }
42 }
43
44 cancelledKeys = 0;
45 needsToSelectAgain = false;
46 final int ioRatio = this.ioRatio;
47 if (ioRatio == 100) {
48 processSelectedKeys();
49 runAllTasks();
50 } else {
51 final long ioStartTime = System.nanoTime();
52
53 processSelectedKeys();//有事件发生时,执行这里
54
55 final long ioTime = System.nanoTime() - ioStartTime;
56 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
57 }
58
59 if (isShuttingDown()) {
60 closeAll();
61 if (confirmShutdown()) {
62 break;
63 }
64 }
65 } catch (Throwable t) {
66 logger.warn("Unexpected exception in the selector loop.", t);
67
68 // Prevent possible consecutive immediate failures that lead to
69 // excessive CPU consumption.
70 try {
71 Thread.sleep(1000);
72 } catch (InterruptedException e) {
73 // Ignore.
74 }
75 }
76 }
77 }
看一下processSelectedKeys代码
1 private void processSelectedKeys() {
2 if (selectedKeys != null) {
3 processSelectedKeysOptimized(selectedKeys.flip());//执行这里
4 } else {
5 processSelectedKeysPlain(selector.selectedKeys());
6 }
7 }
1 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
2 for (int i = 0;; i ++) {
3 final SelectionKey k = selectedKeys[i];
4 if (k == null) {
5 break;
6 }
7 // null out entry in the array to allow to have it GC'ed once the Channel close
8 // See https://github.com/netty/netty/issues/2363
9 selectedKeys[i] = null;
10
11 final Object a = k.attachment();
12
13 if (a instanceof AbstractNioChannel) {//因为是NioServerSocketChannel,所以执行这里
14 processSelectedKey(k, (AbstractNioChannel) a);
15 } else {
16 @SuppressWarnings("unchecked")
17 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
18 processSelectedKey(k, task);
19 }
20
21 if (needsToSelectAgain) {
22 // null out entries in the array to allow to have it GC'ed once the Channel close
23 // See https://github.com/netty/netty/issues/2363
24 for (;;) {
25 if (selectedKeys[i] == null) {
26 break;
27 }
28 selectedKeys[i] = null;
29 i++;
30 }
31
32 selectAgain();
33 // Need to flip the optimized selectedKeys to get the right reference to the array
34 // and reset the index to -1 which will then set to 0 on the for loop
35 // to start over again.
36 //
37 // See https://github.com/netty/netty/issues/1523
38 selectedKeys = this.selectedKeys.flip();
39 i = -1;
40 }
41 }
42 }
1 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
2 final NioUnsafe unsafe = ch.unsafe();
3 if (!k.isValid()) {
4 // close the channel if the key is not valid anymore
5 unsafe.close(unsafe.voidPromise());
6 return;
7 }
8
9 try {
10 int readyOps = k.readyOps();
11 // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
12 // to a spin loop
13 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
14 unsafe.read();//因为是ACCEPT事件,所以执行这里(这里的read会因为NioServerSocketChannel和NioSocketChannel不同)
15 if (!ch.isOpen()) {
16 // Connection already closed - no need to handle write.
17 return;
18 }
19 }
20 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
21 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
22 ch.unsafe().forceFlush();
23 }
24 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
25 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
26 // See https://github.com/netty/netty/issues/924
27 int ops = k.interestOps();
28 ops &= ~SelectionKey.OP_CONNECT;
29 k.interestOps(ops);
30
31 unsafe.finishConnect();
32 }
33 } catch (CancelledKeyException ignored) {
34 unsafe.close(unsafe.voidPromise());
35 }
36 }
NioServerSocketChannel继承了AbstractNioMessageChannel,所以执行的是AbstractNioMessageChannel的版本
1 @Override
2 public void read() {
3 assert eventLoop().inEventLoop();
4 final ChannelConfig config = config();
5 if (!config.isAutoRead() && !isReadPending()) {
6 // ChannelConfig.setAutoRead(false) was called in the meantime
7 removeReadOp();
8 return;
9 }
10
11 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
12 final ChannelPipeline pipeline = pipeline();//获取服务端NioServerSocketChannel的pipeline
13 boolean closed = false;
14 Throwable exception = null;
15 try {
16 try {
17 for (;;) {
18 int localRead = doReadMessages(readBuf);//执行这里
19 if (localRead == 0) {
20 break;
21 }
22 if (localRead < 0) {
23 closed = true;
24 break;
25 }
26
27 // stop reading and remove op
28 if (!config.isAutoRead()) {
29 break;
30 }
31
32 if (readBuf.size() >= maxMessagesPerRead) {
33 break;
34 }
35 }
36 } catch (Throwable t) {
37 exception = t;
38 }
39 setReadPending(false);
40 int size = readBuf.size();
41 for (int i = 0; i < size; i ++) {
42 pipeline.fireChannelRead(readBuf.get(i));//引发ChannelRead
43 }
44
45 readBuf.clear();
46 pipeline.fireChannelReadComplete();//引发channelReadComplete
47
48 if (exception != null) {
49 if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
50 // ServerChannel should not be closed even on IOException because it can often continue
51 // accepting incoming connections. (e.g. too many open files)
52 closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
53 }
54
55 pipeline.fireExceptionCaught(exception);
56 }
57
58 if (closed) {
59 if (isOpen()) {
60 close(voidPromise());
61 }
62 }
63 } finally {
64 // Check if there is a readPending which was not processed yet.
65 // This could be for two reasons:
66 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
67 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
68 //
69 // See https://github.com/netty/netty/issues/2254
70 if (!config.isAutoRead() && !isReadPending()) {
71 removeReadOp();
72 }
73 }
74 }
而对于NioSocketChannel而言,其继承自AbstractNioByteChannel,因此调用的AbstractNioByteChannel的read版本如下:
1 @Override
2 public final void read() {
3 final ChannelConfig config = config();
4 if (!config.isAutoRead() && !isReadPending()) {
5 // ChannelConfig.setAutoRead(false) was called in the meantime
6 removeReadOp();
7 return;
8 }
9
10 final ChannelPipeline pipeline = pipeline();
11 final ByteBufAllocator allocator = config.getAllocator();
12 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
13 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
14 if (allocHandle == null) {
15 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
16 }
17
18 ByteBuf byteBuf = null;
19 int messages = 0;
20 boolean close = false;
21 try {
22 int totalReadAmount = 0;//读到的总长度
23 boolean readPendingReset = false;
24 do {
25 byteBuf = allocHandle.allocate(allocator);
26 int writable = byteBuf.writableBytes();//获取bytebuf还可以写入的字节数
27 int localReadAmount = doReadBytes(byteBuf);//真正的读取,localReadAmount本次读取的实际长度
28 if (localReadAmount <= 0) {//什么都没有读到
29 // not was read release the buffer
30 byteBuf.release();
31 byteBuf = null;
32 close = localReadAmount < 0;
33 break;//跳出循环
34 }
35 if (!readPendingReset) {
36 readPendingReset = true;
37 setReadPending(false);
38 }
39 pipeline.fireChannelRead(byteBuf);//发起调用channelRead,将bytebuf传过去
40 byteBuf = null;
41 //如果当前读到的总长度+本次读到的总长度已经大于Integer类型的最大值
42 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
43 // Avoid overflow.
44 totalReadAmount = Integer.MAX_VALUE;
45 break;//跳出循环
46 }
47 //更新总长度
48 totalReadAmount += localReadAmount;
49
50 // stop reading
51 if (!config.isAutoRead()) {
52 break;//如果不是自动读取,那么读取一次之后就自动停止了
53 }
54 //如果本次读取的大小没有把bytebuf填满,那么说明数据已经全部读取了
55 if (localReadAmount < writable) {
56 // Read less than what the buffer can hold,
57 // which might mean we drained the recv buffer completely.
58 break;//跳出循环
59 }
60 } while (++ messages < maxMessagesPerRead);
61
62 pipeline.fireChannelReadComplete();//跳出循环后,引发channelReadComplete
63 allocHandle.record(totalReadAmount);
64
65 if (close) {
66 closeOnRead(pipeline);
67 close = false;
68 }
69 } catch (Throwable t) {
70 handleReadException(pipeline, byteBuf, t, close);
71 } finally {
72 // Check if there is a readPending which was not processed yet.
73 // This could be for two reasons:
74 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
75 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
76 //
77 // See https://github.com/netty/netty/issues/2254
78 if (!config.isAutoRead() && !isReadPending()) {
79 removeReadOp();
80 }
81 }
82 }
接着看doMessages
1 @Override
2 protected int doReadMessages(List<Object> buf) throws Exception {
3 SocketChannel ch = javaChannel().accept();//创建SocketChannel,accept客户端
4
5 try {
6 if (ch != null) {
7 buf.add(new NioSocketChannel(this, ch));
8 return 1;
9 }
10 } catch (Throwable t) {
11 logger.warn("Failed to create a new channel from an accepted socket.", t);
12
13 try {
14 ch.close();
15 } catch (Throwable t2) {
16 logger.warn("Failed to close a socket.", t2);
17 }
18 }
19
20 return 0;
21 }
执行完doReadMessages之后,针对客户端的SocketChannel已经创建了,由于之后还会引发channelRead和channelReadComplete事件,而这些都会导致pipeline中的ServerBootstrapAcceptor的相应方法被调用,来看一下ServerBootstrapAcceptor源码:
1 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
2
3 private final EventLoopGroup childGroup;
4 private final ChannelHandler childHandler;
5 private final Entry<ChannelOption<?>, Object>[] childOptions;
6 private final Entry<AttributeKey<?>, Object>[] childAttrs;
7
8 ServerBootstrapAcceptor(
9 EventLoopGroup childGroup, ChannelHandler childHandler,
10 Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
11 this.childGroup = childGroup;
12 this.childHandler = childHandler;
13 this.childOptions = childOptions;
14 this.childAttrs = childAttrs;
15 }
16
17 @Override
18 @SuppressWarnings("unchecked")
19 public void channelRead(ChannelHandlerContext ctx, Object msg) {
20 final Channel child = (Channel) msg;
21
22 child.pipeline().addLast(childHandler);//将最开始配置的childHandler添加到SocketChannel的pipeline中,这个Handler也是一个初始化Handler,原理和服务端的一致
23
24 for (Entry<ChannelOption<?>, Object> e: childOptions) {
25 try {
26 if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
27 logger.warn("Unknown channel option: " + e);
28 }
29 } catch (Throwable t) {
30 logger.warn("Failed to set a channel option: " + child, t);
31 }
32 }
33
34 for (Entry<AttributeKey<?>, Object> e: childAttrs) {
35 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
36 }
37
38 try {
39 childGroup.register(child).addListener(new ChannelFutureListener() {//将SocketChannel注册到WORK EventLoopGroup中,注册过程与服务端类似,此处不再讲解
40 @Override
41 public void operationComplete(ChannelFuture future) throws Exception {
42 if (!future.isSuccess()) {
43 forceClose(child, future.cause());
44 }
45 }
46 });
47 } catch (Throwable t) {
48 forceClose(child, t);
49 }
50 }
51
52 private static void forceClose(Channel child, Throwable t) {
53 child.unsafe().closeForcibly();
54 logger.warn("Failed to register an accepted channel: " + child, t);
55 }
56
57 @Override
58 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
59 final ChannelConfig config = ctx.channel().config();
60 if (config.isAutoRead()) {
61 // stop accept new connections for 1 second to allow the channel to recover
62 // See https://github.com/netty/netty/issues/1328
63 config.setAutoRead(false);
64 ctx.channel().eventLoop().schedule(new Runnable() {
65 @Override
66 public void run() {
67 config.setAutoRead(true);
68 }
69 }, 1, TimeUnit.SECONDS);
70 }
71 // still let the exceptionCaught event flow through the pipeline to give the user
72 // a chance to do something with it
73 ctx.fireExceptionCaught(cause);
74 }
75 }
引用一张图(出自:http://blog.csdn.net/zxhoo/article/details/17532857) 。