【问题标题】:Make order of responses match order of requests in Netty使响应顺序与 Netty 中的请求顺序匹配
【发布时间】:2022-01-16 09:18:43
【问题描述】:

上下文

我正在编写一个 Netty 应用程序 (Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个EchoHandler,它会响应原始消息,尽管有时会在短暂的延迟之后:

public class EchoHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    var message = (String) msg;

    if (message.equals("delay")) {
      ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
    } else {
      ctx.writeAndFlush(message);
    }
  }
}

问题

使用此代码,不能保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”和其他字符串中的第二条组成,那么响应的顺序将会颠倒!我写了一个测试来说明这一点:

public class Tests {
  @Test
  public void test() throws ExecutionException, InterruptedException {
    var channel = new EmbeddedChannel(new EchoHandler());
    channel.writeInbound("delay", "second message");

    // Let some time pass and process any scheduled tasks
    Thread.sleep(500);
    channel.runPendingTasks();

    // The response to the last message comes first!
    var firstMessage = (String) channel.readOutbound();
    Assertions.assertEquals("second message", firstMessage);

    // The response to the first message comes second!
    var secondMessage = (String) channel.readOutbound();
    Assertions.assertEquals("delay", secondMessage);
  }
}

问题

我正在 Netty 中寻找一种内置方式来保证传出响应的顺序与传入消息的顺序相匹配。使用库的虚构扩展,我将重写处理程序如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
  var message = (String) msg;

  ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
  
  if (message.equals("delay")) {
    ctx.executor().schedule(() -> {
      ctx.writeAndFlush(message);
      ctx.resumeMessageProcessing(); // After flushing we can resume message processing
    }, 400, TimeUnit.MILLISECONDS);
  } else {
    ctx.writeAndFlush(message);
    ctx.resumeMessageProcessing(); // After flushing we can resume message processing
  }
}

Netty 是否提供开箱即用的类似功能?这听起来像是一个常见的用例,我更愿意依赖经过实战测试的代码,而不是编写自己的队列实现。

【问题讨论】:

    标签: java asynchronous netty


    【解决方案1】:

    虽然不是内置的,但这个问题的一个看似惯用的解决方案是自定义MessageToMessageCodec。基于使用String 作为消息类型的示例,我编写了以下编解码器来为您处理排队:

    public class QueuingCodec extends MessageToMessageCodec<String, String> {
    
      private final Queue<String> messageQueue = new ArrayDeque<>();
      private boolean processingMessage = false;
    
      @Override
      protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
          throws Exception {
    
        // Pass the message on to output
        list.add(s);
    
        // Allow processing more messages
        processingMessage = false;
    
        // Send the next message in the queue if available
        if (!messageQueue.isEmpty()) {
          processingMessage = true;
          var message = messageQueue.poll();
    
          // Pass the message on to input
          ctx.executor().execute(() -> {
            ctx.fireChannelRead(message);
          });
        }
      }
    
      @Override
      protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
          throws Exception {
    
        if (processingMessage) {
          // Store message for later processing
          messageQueue.add(s);
        } else {
          // Pass data on to input
          list.add(s);
    
          // Prevent processing of new messages
          processingMessage = true;
        }
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-04
      相关资源
      最近更新 更多