【发布时间】:2022-01-16 09:18:43
【问题描述】:
上下文
我正在编写一个 Netty 应用程序 (Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个EchoHandler,它会响应原始消息,尽管有时会在短暂的延迟之后:
public class EchoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
if (message.equals("delay")) {
ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
}
}
}
问题
使用此代码,不能保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”和其他字符串中的第二条组成,那么响应的顺序将会颠倒!我写了一个测试来说明这一点:
public class Tests {
@Test
public void test() throws ExecutionException, InterruptedException {
var channel = new EmbeddedChannel(new EchoHandler());
channel.writeInbound("delay", "second message");
// Let some time pass and process any scheduled tasks
Thread.sleep(500);
channel.runPendingTasks();
// The response to the last message comes first!
var firstMessage = (String) channel.readOutbound();
Assertions.assertEquals("second message", firstMessage);
// The response to the first message comes second!
var secondMessage = (String) channel.readOutbound();
Assertions.assertEquals("delay", secondMessage);
}
}
问题
我正在 Netty 中寻找一种内置方式来保证传出响应的顺序与传入消息的顺序相匹配。使用库的虚构扩展,我将重写处理程序如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
if (message.equals("delay")) {
ctx.executor().schedule(() -> {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}, 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}
}
Netty 是否提供开箱即用的类似功能?这听起来像是一个常见的用例,我更愿意依赖经过实战测试的代码,而不是编写自己的队列实现。
【问题讨论】:
标签: java asynchronous netty