【发布时间】:2026-01-31 13:35:01
【问题描述】:
首先,我将解释我试图实现的情况和逻辑:
我有多个线程,每个线程的结果都有效,一些名为
Result的对象进入队列QueueToSend我的
NettyClient在线程中运行,每 1 毫秒从QueueToSend获取Result,应该连接到服务器并发送一条消息,该消息是从Result创建的。 我还需要这个连接是异步的。所以我需要NettyHandler知道Result列表来发送正确的消息并处理正确的结果,然后再次发送响应。
所以我初始化NettyClient bootstrap
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
并在应用启动时设置一次管道。
然后,每毫秒我从QueueToSend 获取Result 对象并连接到服务器
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host,port);
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
我决定使用静态ConcurrentHashMap 来保存从QueueToSend 获取的每个与通道相关的结果对象。
第一个问题发生在方法 channelConnected 中的NettyHandler 中,当我尝试从ResultConcurrentHashMap 获取与通道关联的Result 对象时。
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Channel channel = ctx.getPipeline.getChannel();
Result result = ResultConcurrentHashMap.get(channel.getId());
}
但有时result 为空(1 of 50),甚至认为它应该在ResultConcurrentHashMap 中。我认为这是因为channelConnected 事件发生在NettyClient 运行此代码之前:
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
如果我不在本地主机上运行NettyServer 和NettyClient,它可能不会出现,而是远程运行,建立连接需要更多时间。但我需要解决这个问题。
另一个问题是我每 1 毫秒异步发送一次消息,我认为消息可能混合在一起,服务器无法正确读取它们。如果我一个一个运行它们就可以了:
future.getChannel().getCloseFuture().awaitUninterruptibly();
但我需要异步发送,并处理正确的结果,与通道和发送响应相关联。 我应该实现什么?
【问题讨论】:
标签: java multithreading client-server netty