【问题标题】:Netty ChannelInboundHandlerAdapter async/multithreadingNetty ChannelInboundHandlerAdapter 异步/多线程
【发布时间】:2022-02-04 15:30:01
【问题描述】:

我无法理解 netty 中多线程背后的概念,EventLoopGroup (MultithreadEventLoopGroup)、MultithreadEventExecutorGroupDefaultEventExecutorGroup

我试图了解服务器如何处理多个客户端同时发送请求,这些请求将执行一些业务逻辑和添加到 RTT 的 CRUD 操作。下面是我的 netty 服务器代码,它可以工作,但我试图准确了解它如何与并发用户和多个开放频道一起工作。

我有一个简单的ServerBootstrap

@Component
@RequiredArgsConstructor
public class SocketServer {

    private final ContextAwareLogger logger;
    private final ServerInitializer serverInitializer;
    private final NioEventLoopGroup bossGroup;
    private final NioEventLoopGroup workerGroup;

    private Channel mainChannel;

    @PostConstruct
    public void start() {
        try {
            ServerBootstrap bootstrap = init();
            mainChannel = bootstrap.bind(8484).sync().channel(); // save the main channel so we can cleanly close it when app is shutdown
            logger.info("Netty server started...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void stop() throws InterruptedException {
        logger.info("Shutting down Netty server");
        bossGroup.shutdownGracefully().sync();
        workerGroup.shutdownGracefully().sync();
        mainChannel.closeFuture().sync();
        logger.info("Netty Server shutdown complete.");
    }

    private ServerBootstrap init() {
        return new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 5000)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(serverInitializer);
    }

}

ChannelInitializer:

@Component
@RequiredArgsConstructor
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    private final PacketDecoder packetDecoder;
    private final ServerHandler serverHandler;
    private final PacketEncoder packetEncoder;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline()
            .addLast("decoder", packetDecoder) // ByteArrayDecoder
            .addLast("encoder", packetEncoder) // ByteArrayEncoder
            .addLast("inbound", serverHandler); // ChannelInboundHandlerAdapter
    }

}

ChannelInboundHandlerAdapter:

@Component
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    private SomeService someService;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

       // contains db access
       byte[] accept = someService.validateClient(ctx.channel());

       ctx.channel().writeAndFlush(accept);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

       // may contain db access
       byte[] response = someService.processPacket(ctx.channel(), msg));

       ctx.channel().writeAndFlush(response);
    }
}

现在,当客户端连接时,我知道将打开一个新通道并且将重用处理程序。要求是每个客户端请求/响应都需要立即处理,而无需等待其他一些客户端的 CRUD 操作完成。

因为我使用NioEventLoopGroup,我的 channelRead 和 channelActive 等是否异步(即每个客户端的通道操作是否会相互独立运行)?

如果单个客户端串联发送多个请求,是否保证按相同顺序处理?

我需要为我的入站处理程序指定 DefaultEventExecutorGroup 吗? (https://stackoverflow.com/a/28305019/1738539)

【问题讨论】:

    标签: multithreading asynchronous netty nio


    【解决方案1】:

    您要么需要为您的ServerHandler 使用DefaultEventExecutorGroup,要么将validateClient(...) / processPacket(...) 分派到您自己的线程池。不这样做将导致EventLoop 线程阻塞,因此在阻塞操作完成之前,无法为此EventLoop 处理其他IO。

    【讨论】:

    • 谢谢。因此,例如,如果我在同一个 EventLoop 上打开了两个频道,并且 ServerHandler 正在使用 DefaultEventExecutorGroup,则循环将调度频道 1 和频道 2 的事件,按频道分组,或者它会将所有事件异步触发到我的管道?如果我不将DefaultEventExecutorGroup 添加到我的其他入站处理程序(解码器),它会起作用吗?
    • 是的,是的.....
    猜你喜欢
    • 2015-06-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多