【问题标题】:SocketChannel. invalid stream header: 00000000套接字通道。无效的流标头:00000000
【发布时间】:2021-04-29 13:44:12
【问题描述】:

我想序列化'Message'对象,我可以通过socketChannel成功地将它作为字节数组传输。之后,我更改了对象的属性(使其可能具有更大的大小),然后将对象发送回客户端时出现问题。 一旦我尝试在客户端获取对象,就会出现异常,它发生在我在 getResponse() 方法中取消实现 Message obj 时:

org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 00000000

但是,不知何故,这只适用于第一个客户端(抛出异常后,与第一个客户端的连接结束),当我启动一个新客户端(不关闭服务器)时,我可以成功地来回传输对象,此外,它适用于任何新客户。

这是我的最小可调试版本:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

    private SocketChannel server;

    public void start() throws IOException {
        try {
            server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            server.configureBlocking(false);
        } catch (IOException e) {
            System.err.println("Server isn't responding");
            System.exit(0);
        }

        Scanner scRequest = new Scanner(System.in);
        Scanner scState = new Scanner(System.in);


        System.out.println("Enter request:");
        String request = scRequest.nextLine();

        while (!request.equals("exit")) {
            try {
                // In my actual project class Person is a way different (But it's still a POJO)
                // I included it here to make sure I can get it back after sending to the server
                System.out.println("Enter a number:");
                Person person = new Person(scState.nextInt());
                sendRequest(request, person);

                System.out.println("\nEnter request:");
                request = scRequest.nextLine();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        stop();
    }

    public void sendRequest(String sMessage, Person person) {
        Message message = new Message(sMessage, person);
        ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        try {
            server.write(requestBuffer);
            requestBuffer.clear();
            getResponse();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            System.err.println("Connection lost");
            System.exit(0);
        }
    }

    public void getResponse() throws Exception {
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);

        int read = server.read(responseBuffer);
        responseBuffer.clear();
        if(read == -1) {
            throw new Exception();
        }

        byte[] bytes = new byte[responseBuffer.limit()];
        responseBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        System.out.println(message);
    }

    public void stop() throws IOException {
        server.close();
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.*;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

    public void start() throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started");

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }
                if (key.isReadable()) {
                    try {
                        getRequest(key);
                    } catch (Exception e) {
                        System.err.println(e.getMessage());
                    }
                }
                iter.remove();
            }
        }
    }

    private void getRequest(SelectionKey key) throws Exception {
        SocketChannel client = (SocketChannel) key.channel();

        ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
        int read = client.read(requestBuffer);
        requestBuffer.clear();

        if(read == -1) {
            key.cancel();
            throw new Exception("Client disconnected at: " +
                    ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
        }

        byte[] bytes = new byte[requestBuffer.limit()];
        requestBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        sendResponse(client, message);
    }

    private void sendResponse(SocketChannel client, Message message) throws IOException {

        message.setResult("Some result");

        ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        while (responseBuffer.hasRemaining()) {
            client.write(responseBuffer);
        }
        responseBuffer.clear();
    }

    private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
    }

    public static void main(String[] args) throws Exception {
        new Server().start();
    }
}

我尝试将此对象作为字节数组发送:

import java.io.Serializable;
import java.util.Formatter;

public class Message implements Serializable {

    private String command;
    private Person person;
    private String result;

    public Message(String command, Person person) {
        this.command = command;
        this.person = person;
    }

    public String getCommand() {
        return command;
    }
    public void setCommand(String executedCommand) {
        this.command = executedCommand;
    }
    public Person getPerson() {
        return person;
    }
    public void setPerson(Person person) {
        this.person = person;
    }
    public String getResult() {
        return result;
    }
    public void setResult(String result) {
        this.result = result;
    }

    @Override
    public String toString() {
        return new Formatter()
                .format("Command: %s\nAttached object: %s\nResult: %s",
                        command, person, result)
                .toString();
    }
}

我在 Message obj 中包含此类的实例:

public class Person implements Serializable {
    private final int state;

    public Person(int state) {
        this.state = state;
    }

    @Override
    public String toString() {
        return "Person state: " + state;
    }
}

我不知道出了什么问题,希望得到您的帮助。

UPD:我使用 'org.apache.commons:commons-lang3:3.5' 依赖项将对象序列化为字节数组

【问题讨论】:

  • 我在这里没有看到最小的可调试版本:PersonMessage 丢失,Client 类中的字段 server 丢失,ServerTerminal 类中的字段 controller 丢失。我也看不到任何 main 方法显示您如何启动客户端和服务器。这就是为什么尽管有赏金,但到目前为止您还没有收到任何答案。
  • 它们很重要,因为没有它们,您的示例代码无法编译,没有编译我无法运行它,没有运行它我无法帮助您调试它。您将您的代码称为最小可调试示例,但不幸的是,这是一个错误的陈述。样本小于最小值。
  • 看,我想在这里帮你。所以请采纳我的建议,学习如何通过MCVE 提出一个好的问题。我相信如果你的问题是可重现的,你会很快得到你正在寻找的答案。但是现在您的示例类只是伪代码。即使不考虑缺少的类,代码也使用了类中不存在的两个字段。你故意隐藏信息,因为你认为你知道问题出在哪里。你怎么能这么肯定?为什么让重现问题变得更加困难?
  • 好吧,在您的示例代码中,客户端和服务器的缓冲区大小不同。如果我两边都使用64M或1M,我可以重现问题。晚上剩下的时间我会很忙,但我想明天我可以看看。
  • 你最好不要丢掉SocketChannel和Apache的东西,直接使用SocketsObjectInput/OutputStreams。然后至少你可以确切地看到发生了什么。将非阻塞模式与序列化结合起来几乎是不可能的,这无关紧要。我不会尝试它,而且我是一名 24 年的 Java 程序员。

标签: java serialization nio bytebuffer socketchannel


【解决方案1】:

我以前从未使用过 Java NIO 通道,所以我不是专家。但我发现了几件事:

一般:

  • 为了调试您的代码,使用e.printStackTrace() 而不是仅仅使用System.out.println(e.getMessage()) 会很有帮助。

客户:

  • 客户端中的SocketChannel server应该配置为阻塞,否则可能会因为服务器还没有响应而读取到0字节,导致你的问题。
  • 您应该总是在阅读之前致电ByteBuffer.clear(),而不是之后。
  • 读取后,在调用get(byte[])之前,必须通过responseBuffer.position(0)将字节缓冲区中的位置重置为0,否则它将读取刚刚读取的字节之后的未定义字节。
  • 您应该根据读取的字节数来调整字节数组的大小,而不是字节缓冲区的大小。反之亦然,但效率低下。

服务器:

  • 您应该总是在阅读之前致电ByteBuffer.clear(),而不是之后。
  • 读取后,在调用get(byte[])之前,必须通过responseBuffer.position(0)将字节缓冲区中的位置重置为0,否则它将读取刚刚读取的字节之后的未定义字节。
  • 当在getRequest(key) 调用期间捕获异常时,您应该关闭相应的通道,否则在客户端断开连接后服务器将无限期地尝试从中读取,从而向您的控制台日志发送错误消息。我的修改处理了这种情况,并打印了一条很好的日志消息,告诉哪个客户端(远程套接字地址)已关闭。

警告:您的代码中没有任何内容处理写入通道一侧的请求或响应大于另一侧的最大ByteBuffer 大小的情况。同样,理论上(反)序列化的byte[] 最终也可能比字节缓冲区大。

这是我的差异:

Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (date 1612321383172)
@@ -15,7 +15,7 @@
   public void start() throws IOException {
     try {
       server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
-      server.configureBlocking(false);
+      server.configureBlocking(true);
     }
     catch (IOException e) {
       System.err.println("Server isn't responding");
@@ -56,22 +56,24 @@
       getResponse();
     }
     catch (Exception e) {
-      System.out.println(e.getMessage());
+      e.printStackTrace();
+//      System.out.println(e.getMessage());
       System.err.println("Connection lost");
       System.exit(0);
     }
   }
 
   public void getResponse() throws Exception {
-    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
+    responseBuffer.clear();
 
     int read = server.read(responseBuffer);
-    responseBuffer.clear();
     if (read == -1) {
-      throw new Exception();
+      throw new Exception("EOF, cannot read server response");
     }
 
-    byte[] bytes = new byte[responseBuffer.limit()];
+    byte[] bytes = new byte[read];
+    responseBuffer.position(0);
     responseBuffer.get(bytes);
 
     Message message = SerializationUtils.deserialize(bytes);
Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (date 1612323386278)
@@ -35,7 +35,11 @@
             getRequest(key);
           }
           catch (Exception e) {
-            System.err.println(e.getMessage());
+            e.printStackTrace();
+//            System.err.println(e.getMessage());
+            SocketChannel client = (SocketChannel) key.channel();
+            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
+            client.close();
           }
         }
         iter.remove();
@@ -45,15 +49,16 @@
 
   private void getRequest(SelectionKey key) throws Exception {
     SocketChannel client = (SocketChannel) key.channel();
-    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
+    requestBuffer.clear();
     int read = client.read(requestBuffer);
-    requestBuffer.clear();
     if (read == -1) {
       key.cancel();
       throw new Exception("Client disconnected at: " +
         ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
     }
-    byte[] bytes = new byte[requestBuffer.limit()];
+    byte[] bytes = new byte[read];
+    requestBuffer.position(0);
     requestBuffer.get(bytes);
     Message message = SerializationUtils.deserialize(bytes);
     sendResponse(client, message);

为了完整起见,以下是我更改后的完整课程:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

  private SocketChannel server;

  public void start() throws IOException {
    try {
      server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
      server.configureBlocking(true);
    }
    catch (IOException e) {
      System.err.println("Server isn't responding");
      System.exit(0);
    }

    Scanner scRequest = new Scanner(System.in);
    Scanner scState = new Scanner(System.in);

    System.out.println("Enter request:");
    String request = scRequest.nextLine();

    while (!request.equals("exit")) {
      try {
        // In my actual project class Person is a way different (But it's still a POJO)
        // I included it here to make sure I can get it back after sending to the server
        System.out.println("Enter a number:");
        Person person = new Person(scState.nextInt());
        sendRequest(request, person);

        System.out.println("\nEnter request:");
        request = scRequest.nextLine();
      }
      catch (Exception e) {
        e.printStackTrace();
      }
    }

    stop();
  }

  public void sendRequest(String sMessage, Person person) {
    Message message = new Message(sMessage, person);
    ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    try {
      server.write(requestBuffer);
      requestBuffer.clear();
      getResponse();
    }
    catch (Exception e) {
      e.printStackTrace();
//      System.out.println(e.getMessage());
      System.err.println("Connection lost");
      System.exit(0);
    }
  }

  public void getResponse() throws Exception {
    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
    responseBuffer.clear();

    int read = server.read(responseBuffer);
    if (read == -1) {
      throw new Exception("EOF, cannot read server response");
    }

    byte[] bytes = new byte[read];
    responseBuffer.position(0);
    responseBuffer.get(bytes);

    Message message = SerializationUtils.deserialize(bytes);
    System.out.println(message);
  }

  public void stop() throws IOException {
    server.close();
  }

  public static void main(String[] args) throws IOException {
    Client client = new Client();
    client.start();
  }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
  public void start() throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.bind(new InetSocketAddress("localhost", 5454));
    serverSocket.configureBlocking(false);
    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("Server started");

    while (true) {
      selector.select();
      Set<SelectionKey> selectedKeys = selector.selectedKeys();
      Iterator<SelectionKey> iter = selectedKeys.iterator();
      while (iter.hasNext()) {
        SelectionKey key = iter.next();
        if (key.isAcceptable()) {
          register(selector, serverSocket);
        }
        if (key.isReadable()) {
          try {
            getRequest(key);
          }
          catch (Exception e) {
            e.printStackTrace();
//            System.err.println(e.getMessage());
            SocketChannel client = (SocketChannel) key.channel();
            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
            client.close();
          }
        }
        iter.remove();
      }
    }
  }

  private void getRequest(SelectionKey key) throws Exception {
    SocketChannel client = (SocketChannel) key.channel();
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
    requestBuffer.clear();
    int read = client.read(requestBuffer);
    if (read == -1) {
      key.cancel();
      throw new Exception("Client disconnected at: " +
        ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
    }
    byte[] bytes = new byte[read];
    requestBuffer.position(0);
    requestBuffer.get(bytes);
    Message message = SerializationUtils.deserialize(bytes);
    sendResponse(client, message);
  }

  private void sendResponse(SocketChannel client, Message message) throws IOException {
    message.setResult("Some result");
    ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    while (responseBuffer.hasRemaining()) {
      client.write(responseBuffer);
    }
    responseBuffer.clear();
  }

  private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
    SocketChannel client = serverSocket.accept();
    client.configureBlocking(false);
    client.register(selector, SelectionKey.OP_READ);
    System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
  }

  public static void main(String[] args) throws Exception {
    new Server().start();
  }
}

【讨论】:

  • 非常感谢!我花了很多时间在这上面,但甚至不明白真正的原因。
  • 如果没有MCVE,我将无法调试它。通过提供它,你帮了自己最大的忙。至于调试,我混合使用了日志输出(也是异常的完整堆栈跟踪)和在调试器中单步执行代码,调查双方发送和接收的内容。
猜你喜欢
  • 1970-01-01
  • 2012-12-17
  • 1970-01-01
  • 2015-07-25
  • 1970-01-01
  • 1970-01-01
  • 2011-02-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多