【问题标题】:Piped Input stream is getting locked管道输入流被锁定
【发布时间】:2012-02-07 05:55:17
【问题描述】:

我正在尝试使用管道输入流写入数据。但从线程转储看来,管道输入流上存在锁定。

PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
FileInputStream fis = null;
GZIPOutputStream gos = null;
byte[] buffer = new byte[1024];
try {
    fis = new FileInputStream(file);
    gos = new GZIPOutputStream(pos);
    int length;
    while ((length = fis.read(buffer, 0, 1024)) != -1)
        gos.write(buffer, 0, length);
    } catch(Exception e){
        print("Could not read the file");
    }
    finally {
        try {
            fis.close();
            gos.close();
        }catch (Exception ie){ 
            printException(ie);
        }
    }
writeObject(pis);
pos.close();

writeobj 方法只会从流中读取,但 read 方法会被锁定。 线程转储表明管道输入流有一些等待。

main" prio=10 tid=0x08066000 nid=0x48d2 in Object.wait() [0xb7fd2000..0xb7fd31e8]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0xa5c28be8> (a java.io.PipedInputStream)
    at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:257)
    at java.io.PipedInputStream.receive(PipedInputStream.java:215)
    - locked <0xa5c28be8> (a java.io.PipedInputStream)
    at java.io.PipedOutputStream.write(PipedOutputStream.java:132)
    at java.util.zip.GZIPOutputStream.finish(GZIPOutputStream.java:95)
    at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:146)

   Locked ownable synchronizers:
    - None

我不确定是谁把它锁起来了。阅读文档以找出锁定调用。但无法弄清楚出了什么问题以及如何克服它。

【问题讨论】:

    标签: java stream


    【解决方案1】:

    PipedInputStream 有一个小的非扩展缓冲区。一旦缓冲区已满,就写入 PipedOutputStream 块,直到缓冲的输入被由不同的线程读取。您不能在同一个线程中使用这两个,因为写入将等待无法发生的读取。

    在您的情况下,您在写入所有数据之前不会读取任何数据,因此解决方案是使用 ByteArrayOutputStreamByteArrayInputStream 代替:

    1. 将所有数据写入 ByteArrayOutputStream。
    2. 完成后,在流上调用 toByteArray() 以检索字节数据。
    3. (可选)使用要从其中读取的字节数据创建一个 ByteArrayInputStream 作为 InputStream。

    【讨论】:

    • 如果数据以千兆字节为单位怎么办? (应用程序肯定会崩溃)创建流以防止分配大内存块。
    • @HamidVakilian 在这种情况下它确实会崩溃,但在这种情况下 OP 正在尝试将完整数据存储在 RAM 中,所以我认为他们知道它会适合。
    【解决方案2】:

    使用 PipedInputStream 和 PipedOutputStream 必须在不同的线程中。

    仔细阅读 Javadoc: http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html

    通常,数据由一个线程从 PipedInputStream 对象读取,数据由其他线程写入相应的 PipedOutputStream。不建议尝试在单个线程中同时使用这两个对象,因为这可能会使线程死锁。

    【讨论】:

      【解决方案3】:

      我需要一个过滤器来拦截慢速连接,我需要尽快关闭数据库连接,所以我最初使用 Java 管道,但是当仔细观察它们的实现时,它都是同步的,所以我最终使用一个小缓冲区创建了自己的 QueueInputStream 和阻塞队列将缓冲区放入队列中一次已满,它是无锁的,除非 LinkedBlockingQueue 使用的锁定条件在小缓冲区的帮助下应该很便宜,此类仅用于单个每个实例的生产者和消费者:

      import java.io.IOException;
      import java.io.OutputStream;
      import java.util.concurrent.*;
      
      public class QueueOutputStream extends OutputStream
      {
        private static final int DEFAULT_BUFFER_SIZE=1024;
        private static final byte[] END_SIGNAL=new byte[]{};
      
        private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
        private final byte[] buffer;
      
        private boolean closed=false;
        private int count=0;
      
        public QueueOutputStream()
        {
          this(DEFAULT_BUFFER_SIZE);
        }
      
        public QueueOutputStream(final int bufferSize)
        {
          if(bufferSize<=0){
            throw new IllegalArgumentException("Buffer size <= 0");
          }
          this.buffer=new byte[bufferSize];
        }
      
        private synchronized void flushBuffer()
        {
          if(count>0){
            final byte[] copy=new byte[count];
            System.arraycopy(buffer,0,copy,0,count);
            queue.offer(copy);
            count=0;
          }
        }
      
        @Override
        public synchronized void write(final int b) throws IOException
        {
          if(closed){
            throw new IllegalStateException("Stream is closed");
          }
          if(count>=buffer.length){
            flushBuffer();
          }
          buffer[count++]=(byte)b;
        }
      
        @Override
        public synchronized void write(final byte[] b, final int off, final int len) throws IOException
        {
          super.write(b,off,len);
        }
      
        @Override
        public synchronized void close() throws IOException
        {
          flushBuffer();
          queue.offer(END_SIGNAL);
          closed=true;
        }
      
        public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
        {
          return executor.submit(
                  new Callable<Void>()
                  {
                    @Override
                    public Void call() throws Exception
                    {
                      try{
                        byte[] buffer=queue.take();
                        while(buffer!=END_SIGNAL){
                          outputStream.write(buffer);
                          buffer=queue.take();
                        }
                        outputStream.flush();
                      } catch(Exception e){
                        close();
                        throw e;
                      } finally{
                        outputStream.close();
                      }
                      return null;
                    }
                  }
          );
        }
      

      【讨论】:

        猜你喜欢
        • 2012-01-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2010-09-26
        • 1970-01-01
        • 2011-05-12
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多