【问题标题】:Netty client multiple requestsNetty 客户端多次请求
【发布时间】:2026-01-31 13:35:01
【问题描述】:

首先,我将解释我试图实现的情况和逻辑:

  • 我有多个线程,每个线程的结果都有效,一些名为Result 的对象进入队列QueueToSend

  • 我的NettyClient 在线程中运行,每 1 毫秒从QueueToSend 获取Result,应该连接到服务器并发送一条消息,该消息是从Result 创建的。 我还需要这个连接是异步的。所以我需要NettyHandler 知道Result 列表来发送正确的消息并处理正确的结果,然后再次发送响应。

所以我初始化NettyClient bootstrap

bootstrap = new ClientBootstrap(
                new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

并在应用启动时设置一次管道。 然后,每毫秒我从QueueToSend 获取Result 对象并连接到服务器

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host,port);
ResultConcurrentHashMap.put(future.getChannel().getId(), result);

我决定使用静态ConcurrentHashMap 来保存从QueueToSend 获取的每个与通道相关的结果对象。

第一个问题发生在方法 channelConnected 中的NettyHandler 中,当我尝试从ResultConcurrentHashMap 获取与通道关联的Result 对象时。

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
       Channel channel = ctx.getPipeline.getChannel();
       Result result = ResultConcurrentHashMap.get(channel.getId());
}

但有时result 为空(1 of 50),甚至认为它应该在ResultConcurrentHashMap 中。我认为这是因为channelConnected 事件发生在NettyClient 运行此代码之前:

ResultConcurrentHashMap.put(future.getChannel().getId(), result);

如果我不在本地主机上运行NettyServerNettyClient,它可能不会出现,而是远程运行,建立连接需要更多时间。但我需要解决这个问题。

另一个问题是我每 1 毫秒异步发送一次消息,我认为消息可能混合在一起,服务器无法正确读取它们。如果我一个一个运行它们就可以了:

future.getChannel().getCloseFuture().awaitUninterruptibly();

但我需要异步发送,并处理正确的结果,与通道和发送响应相关联。 我应该实现什么?

【问题讨论】:

    标签: java multithreading client-server netty


    【解决方案1】:

    ChannelFuture 在事件被触发之前异步执行。例如,通道连接未来将在触发通道连接事件之前完成。

    所以你必须在调用 bootstrap.connect() 后注册一个通道未来监听器,并在监听器中编写代码来初始化 HashMap,然后它将对处理程序可见。

            ChannelFuture channelFuture = bootstrap.connect(remoteAddress, localAddress);
    
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    resultConcurrentHashMap.put(future.getChannel().getId(), result);
                }
            });
    

    【讨论】:

    • 我的 NettyClient 每 1 毫秒从队列中获取结果对象,因此 netty 客户端中的结果对象每 1 毫秒更改一次,如果通道连接将在 2 毫秒出现,我将有另一个与此通道关联的结果对象?还是不行?
    • 我猜,你必须在线程(结果生产者、Netty 客户端)之间使用阻塞队列而不是哈希图来交换数据。调用 bootstrap.connect() 后,创建一个 BlockingQueue 作为本地通道。处理程序可以在客户端准备好发送时查找队列,并每 1 毫秒轮询一次队列。从结果生产者那里,只提供数据。不会因为延迟 netty 客户端启动而丢失数据。
    • 感谢您的重播。但在我的情况下它不起作用。我有大量的结果生产者,需要尽可能多地保持异步通道发送、接收消息。现在我 90% 准备好说帧混合是服务器问题。您的第一个答案有助于解决第一个问题,所以我接受您的回答。谢谢