【问题标题】:Java networking: evented Socket/InputStreamJava 网络:事件 Socket/InputStream
【发布时间】:2025-12-15 13:30:01
【问题描述】:

我正在 Java 的 Sockets 上实现一个面向事件的层,我想知道是否有办法确定是否有待读取的数据。

我的正常方法是从套接字读取到缓冲区,并在缓冲区填充给定数量的字节时调用提供的回调(如果每次到达时都需要触发回调,则可能为 0 ),但我怀疑 Java 已经在为我做缓冲了。

InputStream 的available() 方法对此是否可靠?我是否应该只使用read() 并在 Socket 之上进行自己的缓冲?还是有别的办法?

【问题讨论】:

    标签: java events sockets networking


    【解决方案1】:

    简而言之,不。 available() 不可靠(至少不适合我)。我建议使用java.nio.channels.SocketChannelSelectorSelectionKey 连接。这个解决方案有点基于事件,但比普通的套接字更复杂。

    对于客户:

    1. 构造套接字通道(socket),打开一个选择器(selector = Selector.open();)。
    2. 使用非阻塞socket.configureBlocking(false);
    3. 注册连接选择器socket.register(selector, SelectionKey.OP_CONNECT);
    4. 连接socket.connect(new InetSocketAddress(host, port));
    5. 看看有没有新的selector.select();
    6. 如果“new”表示连接成功,则注册OP_READ的选择器;如果“新”指的是可用数据,则只需从套接字读取。

    但是,为了使其异步,您需要设置一个单独的线程(尽管套接字被创建为非阻塞,但线程无论如何都会阻塞)来检查是否有东西到达。

    对于服务器,有ServerSocketChannel,您使用OP_ACCEPT

    作为参考,这是我的代码(客户端),应该给你一个提示:

     private Thread readingThread = new ListeningThread();
    
     /**
      * Listening thread - reads messages in a separate thread so the application does not get blocked.
      */
     private class ListeningThread extends Thread {
      public void run() {
       running = true;
       try {
        while(!close) listen();
        messenger.close();
       }
       catch(ConnectException ce) {
        doNotifyConnectionFailed(ce);
       }
       catch(Exception e) {
    //    e.printStackTrace();
        messenger.close();
       }
       running = false;
      }
     }
    
     /**
      * Connects to host and port.
      * @param host Host to connect to.
      * @param port Port of the host machine to connect to.
      */
     public void connect(String host, int port) {
      try {
       SocketChannel socket = SocketChannel.open();
       socket.configureBlocking(false);
       socket.register(this.selector, SelectionKey.OP_CONNECT);
       socket.connect(new InetSocketAddress(host, port));
      }
      catch(IOException e) {
       this.doNotifyConnectionFailed(e);
      }
     }
    
     /**
      * Waits for an event to happen, processes it and then returns.
      * @throws IOException when something goes wrong.
      */
     protected void listen() throws IOException {
      // see if there are any new things going on
      this.selector.select();
      // process events
      Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
      while(iter.hasNext()) {
       SelectionKey key = iter.next();
       iter.remove();
       // check validity
       if(key.isValid()) {
        // if connectable...
        if(key.isConnectable()) {
         // ...establish connection, make messenger, and notify everyone
         SocketChannel client = (SocketChannel)key.channel();
         // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast
         if(client!=null && client.finishConnect()) {
          client.register(this.selector, SelectionKey.OP_READ);
         }
        }
        // if readable, tell messenger to read bytes
        else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) {
         // read message here
        }
       }
      }
     }
    
     /**
      * Starts the client.
      */
     public void start() {
      // start a reading thread
      if(!this.running) {
       this.readingThread = new ListeningThread();
       this.readingThread.start();
      }
     }
    
     /**
      * Tells the client to close at nearest possible moment.
      */
     public void close() {
      this.close = true;
     }
    

    对于服务器:

     /**
      * Constructs a server.
      * @param port Port to listen to.
      * @param protocol Protocol of messages.
      * @throws IOException when something goes wrong.
      */
     public ChannelMessageServer(int port) throws IOException {
      this.server = ServerSocketChannel.open();
      this.server.configureBlocking(false);
      this.server.socket().bind(new InetSocketAddress(port));
      this.server.register(this.selector, SelectionKey.OP_ACCEPT);
     }
    
     /**
      * Waits for event, then exits.
      * @throws IOException when something goes wrong.
      */
     protected void listen() throws IOException {
      // see if there are any new things going on
      this.selector.select();
      // process events
      Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
      while(iter.hasNext()) {
       SelectionKey key = iter.next();
       // do something with the connected socket
       iter.remove();
       if(key.isValid()) this.process(key);
      }
     }
    
     /**
      * Processes a selection key.
      * @param key SelectionKey.
      * @throws IOException when something is wrong.
      */
     protected void process(SelectionKey key) throws IOException {
      // if incoming connection
      if(key.isAcceptable()) {
       // get client
       SocketChannel client = (((ServerSocketChannel)key.channel()).accept());
        try {
         client.configureBlocking(false);
         client.register(this.selector, SelectionKey.OP_READ);
        }
        catch(Exception e) {
         // catch
        }
      }
      // if readable, tell messenger to read
      else if(key.isReadable()) {
      // read
      }
     }
    

    希望这会有所帮助。

    【讨论】:

    • 我不明白。您不需要单独的线程。根据定义,非阻塞套接字不会阻塞。只需正确使用 OP_READ,并在读取返回零时停止正确的读取循环。
    • @EJP:不反对;但是在我看来,无论阻塞如何,从套接字读取仍然被阻塞,即使没有什么可读取的。不过,可能是我做错了什么。我建议提问者按照你说的进行尝试,如果它不起作用 - 尝试线程。
    • 您几乎可以肯定所做的是循环,而 read() 返回零。这就是我提到它的原因。那不是阻塞,那是循环。
    【解决方案2】:

    available() 只会告诉您是否可以在不进入操作系统的情况下读取数据。它在这里不是很有用。

    您可以根据需要进行阻塞或非阻塞读取。当没有数据要读取时,非阻塞读取才会返回,这可能是您想要的。

    【讨论】:

    • 不正确。 available() 将告诉您 BufferedInputStream/BufferedReader 中的数据总和,如果您使用的是一个,以及套接字接收缓冲区,这是一个内核数据结构。如果数据仅在套接字接收缓冲区中,则您必须“转到操作系统”才能获取它,但您不会在此过程中阻塞。正如Javadoc所说。但是,如果它是 SSLSocket,available() 总是返回零。