【问题标题】:How to let selector that socketchannel key change in java niojava - 如何让选择器在java nio中改变socketchannel键
【发布时间】:2011-11-12 21:52:31
【问题描述】:

我对java nio的用法有疑问,希望有很多java nio知识的人可以帮助我澄清一些误解。

我正在使用 java nio 套接字。使用 socketchannel.write() 可能会填满写缓冲区。在这种情况下,剩余的缓冲区会排队,并且 key 会更改为 OP_WRITE。我的一种情况是队列长度很长。每次调用 selector.select() 之前,我都会从另一个名为 pendingRequest 的队列中将键更改为 OP_WRITE。但是我发现由于读取速度很慢,在发送处理完成后,还有很多未写入的消息,它们仍在队列中。如何处理这个问题?

在我的代码中,我有两个写作的地方。一个来自生成器:当它有消息要发布时,它直接写入通道。如果缓冲区已满,数据将排队。第二个地方是调度器:当 key 可写时,它会回调 write() 以写入排队的数据。我想这两个部分可以竞争写作。我只是觉得我的代码缺少一些处理来配合两次写入。

有什么办法可以解决我上面提出的问题吗?我在我的代码中发现许多排队的数据无法写出。当密钥可写时,生成器可能会再次写入数据,这导致排队数据写入的更改较少。如何使这部分正确?谢谢

//在WriteListener()中,编写代码为以下三部分

   public synchronized int writeData(EventObject source) {      
    int n = 0; 
    int count = 0;

    SocketChannel socket = (SocketChannel)source.getSource();       
    ByteBuffer buffer = ((WriteEvent)source).getBuffer();   
    try {
        write(socket);
    } catch (IOException e1) {          
        e1.printStackTrace();
    }       

    while (buffer.position()>0) {   
        try {           
                buffer.flip();  
                n = socket.write(buffer);                                   
                if(n == 0) {
                        key.interestOps(SelectionKey.OP_WRITE);                         synchronized (this.pendingData) {  
                            List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket); 
                            if(queue == null) {
                                queue = new ArrayList<ByteBuffer>();
                                this.pendingData.put(socket, queue); 
                        }
                        queue.add(buffer);

                        logger.logInfo("queue length:" + queue.size());
                    }                                               
                    break;
                }               
                count += n; 

        } catch (IOException e) {               
            e.printStackTrace();
        }   finally {                       
            buffer.compact();              
        }
    }   

    if(buffer.position()==0) {                      
        key.interestOps(SelectionKey.OP_READ);                  
    }
            return count;   

}   

// ==== 这个写方法是用来写队列缓冲区的

  public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {        
    int n = 0; 
    int count = 0;

    SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());                
    while (wbuf.position()>0) {     
        try {           
            wbuf.flip();        

            n = sc.write(wbuf);             

            if(n == 0) {    
                   key.interestOps(SelectionKey.OP_WRITE);                                  
                    synchronized (this.pendingData) {  
                        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc); 
                        if(queue == null) {
                                queue = new ArrayList<ByteBuffer>();
                                this.pendingData.put(sc, queue); 
                        }
                        queue.add(wbuf);
                    }

                    break;
                }               
                count += n; 

        } catch (IOException e) {               
            e.printStackTrace();
        }   finally {               

            wbuf.compact();                
        }
    }   

    if(wbuf.position()==0) {    
        wbuf.clear();               
        key.interestOps(SelectionKey.OP_READ);          
    }

return n;       
}   

// ====这个方法是key.isWritable()为真时Dispatch的回调

public void write(SocketChannel socketChannel) throws IOException {         
   SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());     
    synchronized (this.pendingData) {             
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);              
        if(queue == null || queue.isEmpty()) {                 
            // We wrote away all data, so we're no longer interested                 
            // in writing on this socket. Switch back to waiting for  data.                 
            try {                     
                if (key!=null)                         
                    key.interestOps(SelectionKey.OP_READ);                 
            } catch(Exception ex) {                     
                if (key!=null)                         
                    key.cancel();                 
                }             
        }           

        // Write until there's not more data ...    
        int n = 0;
        while (queue != null && !queue.isEmpty()) {                 
            ByteBuffer buf = (ByteBuffer) queue.get(0);   
            // zero length write, break the loop and wait for next writable time 
            n = write(socketChannel, buf);

            logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms.");

            if(n==0)  {             
                break;
            }
                      queue.remove(0); 

        }        

 }   

【问题讨论】:

  • 请发布一些代码(最好是 SSCCE )来证明您遇到的问题。
  • 最后一种方法,即写入队列的方法,即使if (n == 0) 也总是从队列中删除缓冲区。您应该在 删除缓冲区之前进行该测试并中断,实际上您应该仅在缓冲区为空时才删除该缓冲区。目前您正在丢失数据。
  • 在我当前的代码中,remove (0) 是在测试写入的字节数之后放置的。目前代码可以工作,但性能不如它应该做的那么好。缓冲区满后下一个可写时间需要多长时间?

标签: java queue nio


【解决方案1】:

如果您的消费者速度太慢,唯一的选择可能是断开它们以保护您的服务器。您不希望一个不良消费者影响您的其他客户。

我通常会增加发送缓冲区的大小,如果它填满,我会关闭连接。这避免了在 Java 代码中处理未写入数据的复杂性,因为您真正要做的只是稍微扩展缓冲区。如果您增加发送缓冲区大小,您就是在透明地执行此操作。有可能您甚至不需要使用发送缓冲区大小,默认值通常约为 64 KB。

【讨论】:

  • 我使用以下内容来更改socketchannel缓冲区大小。但是为什么serverSocket只能设置receivebuffer大小。 socket.socket().setReceiveBufferSize(256 * 1024); socket.socket().setSendBufferSize(256 * 1024);
  • @susan ServerSocket 允许您更改接收缓冲区大小,该大小由接受的套接字继承,因此您可以将它们设置为 > 64k。如果您尝试对已接受的套接字执行此操作,它将失败,因为 > 64k 需要在连接握手期间协商的 TCP“窗口缩放”选项。出于同样的原因,如果您想在客户端套接字上设置大于 64k 的接收缓冲区,则必须在连接之前进行。您可以随时设置发送缓冲区,因为它不需要协议的帮助。
  • @susan 更正:它不会“失败”,但不会使用 64k 以上的部分,除非在连接之前设置了大小。
  • 有什么办法可以解决我上面提出的问题吗?我在我的代码中发现许多排队的数据无法写出。当密钥可写时,生成器可能会再次写入数据,这导致排队数据写入的更改较少。如何使这部分正确?谢谢。
  • 另一个建议您更改发送缓冲区大小的原因是假设您对消费者的控制较少,这就是为什么它的执行速度不够快,无法使用您发送的数据。我的“解决方案”是;记录警告并关闭与慢速消费者的连接。可能增加发送者缓冲区的原因是为了减少误报,即除非确实有问题,否则不会发生这种情况。
【解决方案2】:
  1. 您必须确保新数据在已等待写入的数据之后入队。

  2. 如果行为仍然存在,您实际上只有两个选择:要么以行为不当为由断开客户端,要么停止为其生成输出,直到积压清除。可能两者兼而有之。

您可以通过熟练地使用较长的 select() 超时来实现第一个。如果 select() 返回零,则意味着在超时期间没有注册通道或任何通道都没有发生任何事情,在这种情况下,您可能需要考虑与 所有 客户端断开连接。如果您有很多并发客户端过于笨拙而无法工作,那么您必须跟踪每个通道的最后一次选择时间,并断开最后活动时间过早的任何通道。

在该超时期限内,您可能希望在那个人阅读缓慢时停止为他生成输出。

“长”的精确定义留给读者作为练习,但十分钟作为第一个近似值出现在脑海中。

【讨论】:

  • 非常感谢您的帮助。它的工作原理是让消息在生成器端生成下一条消息之前写出。但是对于其他进程,如果缓冲区已满,则等待unitl有空间再次写入。
猜你喜欢
  • 1970-01-01
  • 2012-09-02
  • 1970-01-01
  • 2016-10-03
  • 1970-01-01
  • 2011-01-11
  • 2013-10-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多