【问题标题】:how to get my bytebuf to send my entire message Netty如何让我的 bytebuf 发送我的整个消息 Netty
【发布时间】:2020-05-23 11:53:17
【问题描述】:

我每个人。

我问你是因为我在 Netty 的解码器中的 ByteBuf 有问题。

我想解码由服务器到达但 ByteBuf 无法正常工作的消息。

问题是 ByteBuf 没有占用消息的所有字节。

我解释一下,我有一条长度为 1221 字节的消息(这是一个示例),但缓冲区大小仅为 64 字节。

当我尝试读取时,缓冲区的长度和我有这样的错误:

io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(117) + length(101) exceeds writerIndex(192): PooledUnsafeDirectByteBuf(ridx: 117, widx: 192, cap: 192)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:470) ~[netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

我认为 Netty 没有时间阅读所有内容并只发送部分消息,但我现在没有时间配置 Netty,因为他必须等到消息全部到达。

如果有人可以帮助我,我很感激

为了获得最大的帮助,我给你解码器的代码

    int length = buffer.readInt();
    int messageType = buffer.readInt();


        Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
        if (supplier == null) {
            LOGGER.debug("This message type isn't supported: {}", messageType);
        } else {
            ByteBuf data = buffer.readBytes(length);
            if (messageType != 6) {
                AbstractMessage message = supplier.get();
                message.read(data, version);
                list.add(message);
                LOGGER.debug("{}", message);
            }
        }
    }

}

消息格式如下: 4 个字节的消息长度 (int) MessageType 4 个字节 (int) n 字节数据(MessageLength 大小)

我给你我用来解释Here的文档。

【问题讨论】:

    标签: java buffer netty


    【解决方案1】:

    非常感谢。 我进行了更改,但您的代码启发了我并解决了我的问题。 我认为我收到了片段消息,但我在缓冲区中没有很多知识。

    如果这篇文章可以帮助其他人,我把更正的代码放在下面

    if (length == 0) {
            length = buffer.readInt();
            messageType = buffer.readInt();
        }
    
        if (buffer.writerIndex() < length + buffer.readerIndex()) {
            // ensure we have enough data so we can also read the message type and the whole message body
            return;
        }
    
        if (!(messageType == MessageType.HEARTBEAT_REQ.getValue() || messageType == MessageType.HEARTBEAT_CONF.getValue())) {
            LOGGER.debug("The message type is : {}", messageType);
        }
    
        Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
        if (supplier == null) {
            LOGGER.debug("This message type isn't supported: {}", messageType);
            buffer.skipBytes(length);
        } else {
            ByteBuf data = buffer.readSlice(length);
            AbstractMessage message = supplier.get();
            message.read(data, version);
            list.add(message);
            if (messageType != 6) {
                LOGGER.debug("{}", message);
            }
            length = 0;
            messageType = 0;
        }
    

    请注意,长度和消息类型变量现在在解码器中是静态的

    private static int length = 0;
    private static int messageType = 0;
    

    【讨论】:

      【解决方案2】:

      您需要通过扩展ByteToMessageDecoder 和缓冲区来编写自己的解码器,直到您收到所有内容。由于这是 TCP,您可能会以碎片方式接收字节,因此您需要自己再次组装它。

      这样的事情应该可以工作:

      class MyDecoder extends ByteToMessageDecoder {
      
          @Override
          protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) {
              if (input.readableBytes() < 4) {
                  // we need to have at least 4 bytes to read to be able to get the message length
                  return;
              }
              int length = input.getInt(input.readerIndex());
              if (input.readableBytes() < 8 + length) {
                  // ensure we have enough data so we can also read the message type and the whole message body
                  return;
              }
              // skip the length now
              input.skipBytes(4);
      
              int messageType = input.readInt();
      
              Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
              if (supplier == null) {
                  LOGGER.debug("This message type isn't supported: {}", messageType);
                  input.skip(length);
              } else {
                  if (messageType != 6) {
                      ByteBuf data = buffer.readSlice(length);
                      AbstractMessage message = supplier.get();
                      message.read(data, version);
                      list.add(message);
                      LOGGER.debug("{}", message);
                  }
              }
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-01-13
        • 1970-01-01
        • 2014-04-29
        • 1970-01-01
        • 2022-01-23
        • 1970-01-01
        • 1970-01-01
        • 2012-01-24
        相关资源
        最近更新 更多