【问题标题】:[vert.x]Vertx tcp socket reads partial messages when writing to fast[vert.x]Vertx tcp 套接字在写入快速时读取部分消息
【发布时间】:2018-11-13 16:40:30
【问题描述】:

我正在尝试构建一个实时应用程序,一次发送和接收分配的信息。我已经设法让 Vertx 套接字服务器和客户端正常工作,但我面临一个问题。当我有大量数据要一次写入大约 1000 条记录时,缓冲区似乎读取速度太快,并且部分读取了当前消息之前的消息,例如,如果我发送两条消息

消息 1“你好世界”

消息 2“我在这里”

然后套接字将接收

“Hello worldI am”,然后它会收到“here”

但它应该单独接收这些字符串。我如何强制套接字一次只接收一条消息,就像 HTTP 客户端使用其 request.bodyHandler 方法一样。对此的任何帮助将不胜感激。

这是我的服务器代码

NetServerOptions options = new NetServerOptions();
    options.setReceiveBufferSize(max_buffer_size);
    options.setSendBufferSize(max_buffer_size);
    NetServer server = vertx.createNetServer(options);
    server.exceptionHandler(e -> {
        Debug.log("Client exception " +e.toString()); 
    });


    server.connectHandler(socket -> {
        socket.exceptionHandler(e -> {
            Debug.log("Client socket exception " + e.toString());
        });
        String host = socket.localAddress().host();

        Buffer totalBuffer = Buffer.buffer();
        socket.handler(buffer -> {
            Debug.log("data ::" + buffer.toString());
            final RecordParser parser = RecordParser.newDelimited(delemeter, h -> {
                Debug.log("data SPLIT ::::::::" + h.toString());
                handler.HandleUpdate(h.toString(), socket);
            });
            parser.handle(buffer);
        });
    });

这是我的客户端代码:

NetClientOptions options = new NetClientOptions()
            .setConnectTimeout(this.connection_timeout)
            .setReconnectAttempts(this.connection_attempts)
            .setReconnectInterval(this.connection_reconnect_delay)
            .setReceiveBufferSize(max_buffer_size)
            .setSendBufferSize(max_buffer_size);

    NetClient client = this.vertx.createNetClient(options);
    client.connect(this.port, ip, res -> {
        if (res.succeeded()) {
            System.out.println("Connected!");
            this.client_socket = res.result();

            Buffer totalBuffer = Buffer.buffer();
            this.client_socket.handler(buffer -> {
                    Debug.log("data ::" +buffer.toString()); 
                handler.HandleUpdate(buffer.toString(), null);
            });

        } else {
            System.out.println("Failed to connect: " + res.cause().getMessage());
        }
    });

【问题讨论】:

标签: java sockets vert.x


【解决方案1】:

问题在于您没有 TCP 协议来正确识别不同的消息,并且它与 Vert.x 无关。任何其他客户端/服务器都会遇到同样的问题。

例如,您需要有一种方法来识别消息

  • 在消息中使用长度前缀编码,例如 long 11 + "Hello World"
  • 使用分隔符,例如“Hello World\n”,其中 \n 是字符分隔符

这样解码器可以正确解码消息。

如果您查看 HTTP/1 协议,它就是这样做的,例如分块编码消息:

PUT /foo HTTP/1.1\r\n Transfer-encoding: chunked\r\n \r\n 11\r\n Hello World\r\n 0\r\n \r\n

这使用了前面解释的两种技术。

【讨论】:

    【解决方案2】:

    您的错误只是对 lambda 的误解

    你只需要替换

    socket.handler(buffer -> {
        Debug.log("data ::" + buffer.toString());
        final RecordParser parser = RecordParser.newDelimited(delemeter, h -> {
            Debug.log("data SPLIT ::::::::" + h.toString());
            handler.HandleUpdate(h.toString(), socket);
        });
        parser.handle(buffer);
    });
    

    类似

    socket.handler(
        RecordParser.newDelimited(delemeter, h -> {
            Debug.log("data SPLIT ::::::::" + h.toString());
            handler.HandleUpdate(h.toString(), socket);
        })
    });
    

    首先,每次收到来自套接字的消息时,您都会创建一个新的实例解析器。第二个代码创建了一个解析器实例,它将接收每个缓冲区并正确解析块。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-25
      • 1970-01-01
      • 2018-02-03
      • 1970-01-01
      • 1970-01-01
      • 2013-12-15
      • 2013-07-28
      • 2013-05-25
      相关资源
      最近更新 更多