【发布时间】:2011-11-12 21:52:31
【问题描述】:
我对java nio的用法有疑问,希望有很多java nio知识的人可以帮助我澄清一些误解。
我正在使用 java nio 套接字。使用 socketchannel.write() 可能会填满写缓冲区。在这种情况下,剩余的缓冲区会排队,并且 key 会更改为 OP_WRITE。我的一种情况是队列长度很长。每次调用 selector.select() 之前,我都会从另一个名为 pendingRequest 的队列中将键更改为 OP_WRITE。但是我发现由于读取速度很慢,在发送处理完成后,还有很多未写入的消息,它们仍在队列中。如何处理这个问题?
在我的代码中,我有两个写作的地方。一个来自生成器:当它有消息要发布时,它直接写入通道。如果缓冲区已满,数据将排队。第二个地方是调度器:当 key 可写时,它会回调 write() 以写入排队的数据。我想这两个部分可以竞争写作。我只是觉得我的代码缺少一些处理来配合两次写入。
有什么办法可以解决我上面提出的问题吗?我在我的代码中发现许多排队的数据无法写出。当密钥可写时,生成器可能会再次写入数据,这导致排队数据写入的更改较少。如何使这部分正确?谢谢
//在WriteListener()中,编写代码为以下三部分
public synchronized int writeData(EventObject source) {
int n = 0;
int count = 0;
SocketChannel socket = (SocketChannel)source.getSource();
ByteBuffer buffer = ((WriteEvent)source).getBuffer();
try {
write(socket);
} catch (IOException e1) {
e1.printStackTrace();
}
while (buffer.position()>0) {
try {
buffer.flip();
n = socket.write(buffer);
if(n == 0) {
key.interestOps(SelectionKey.OP_WRITE); synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
if(queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(socket, queue);
}
queue.add(buffer);
logger.logInfo("queue length:" + queue.size());
}
break;
}
count += n;
} catch (IOException e) {
e.printStackTrace();
} finally {
buffer.compact();
}
}
if(buffer.position()==0) {
key.interestOps(SelectionKey.OP_READ);
}
return count;
}
// ==== 这个写方法是用来写队列缓冲区的
public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {
int n = 0;
int count = 0;
SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());
while (wbuf.position()>0) {
try {
wbuf.flip();
n = sc.write(wbuf);
if(n == 0) {
key.interestOps(SelectionKey.OP_WRITE);
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc);
if(queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(sc, queue);
}
queue.add(wbuf);
}
break;
}
count += n;
} catch (IOException e) {
e.printStackTrace();
} finally {
wbuf.compact();
}
}
if(wbuf.position()==0) {
wbuf.clear();
key.interestOps(SelectionKey.OP_READ);
}
return n;
}
// ====这个方法是key.isWritable()为真时Dispatch的回调
public void write(SocketChannel socketChannel) throws IOException {
SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);
if(queue == null || queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for data.
try {
if (key!=null)
key.interestOps(SelectionKey.OP_READ);
} catch(Exception ex) {
if (key!=null)
key.cancel();
}
}
// Write until there's not more data ...
int n = 0;
while (queue != null && !queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
// zero length write, break the loop and wait for next writable time
n = write(socketChannel, buf);
logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms.");
if(n==0) {
break;
}
queue.remove(0);
}
}
【问题讨论】:
-
请发布一些代码(最好是 SSCCE )来证明您遇到的问题。
-
最后一种方法,即写入队列的方法,即使
if (n == 0)也总是从队列中删除缓冲区。您应该在 删除缓冲区之前进行该测试并中断,实际上您应该仅在缓冲区为空时才删除该缓冲区。目前您正在丢失数据。 -
在我当前的代码中,remove (0) 是在测试写入的字节数之后放置的。目前代码可以工作,但性能不如它应该做的那么好。缓冲区满后下一个可写时间需要多长时间?