【问题标题】:Flaws with PipedInputStream/PipedOutputStreamPipedInputStream/PipedOutputStream 的缺陷
【发布时间】:2012-03-18 00:52:36
【问题描述】:

我在 SO 上看到了两个答案,声称 Java 提供的 PipedInputStreamPipedOutputStream 类存在缺陷。但他们没有详细说明他们出了什么问题。他们真的有缺陷吗?如果有,以什么方式?我目前正在编写一些使用它们的代码,所以我想知道我是否走错路了。

One answer 说:

PipedInputStreamPipedOutputStream 已损坏(关于线程)。他们假设每个实例都绑定到一个特定的线程。这很奇怪。

在我看来,这既不奇怪也不破碎。或许作者心中还有一些其他的缺陷?

Another answer说:

在实践中,最好避免使用它们。我在 13 年中使用过一次,但我希望我没有。

但是那个作者记不起问题出在哪里了。


与所有类一样,尤其是在多线程中使用的类,如果您滥用它们,就会遇到问题。所以我不认为PipedInputStream 可能抛出的不可预测的"write end dead" IOException 是一个缺陷(未能连接到close() 是一个错误;有关更多信息,请参阅Daniel Ferbers 的文章Whats this? IOException: Write end dead) .还有哪些其他声称的缺陷?

【问题讨论】:

  • 这个stackoverflow.com/questions/484119/… 涵盖了它。它们并不是真正的“缺陷”,只是有点棘手,而且通常还有一种符号代码的味道,如果你 100% 确定你需要它们并且设计中没有错误,那么使用它们就没有真正的问题......
  • 快速浏览一下,因为我想使用一个。它至少是“Under Featured”,因为读取线程并没有真正等待写入线程写入完整的读取请求,并且如果写入器关闭它则以 EOF 异常中止等。它具有非常原始的线程处理和同步,并且要求缓冲区与最大读取请求一样大。

标签: java multithreading io


【解决方案1】:

它们没有缺陷。

与所有类一样,尤其是在多线程中使用的类,如果您滥用它们,就会遇到问题。 PipedInputStream 可以抛出的不可预测的“write end dead”IOException 不是缺陷(未能连接到 close() 是一个错误;请参阅 Daniel Ferbers 的文章 Whats this? IOException: Write end dead 以获取更多信息) .

【讨论】:

【解决方案2】:

我在我的项目中很好地使用了它们,它们对于动态修改流和传递它们非常有用。唯一的缺点似乎是 PipedInputStream 的缓冲区很短(大约 1024),而我的输出流在 8KB 左右。

它没有任何缺陷,而且效果很好。

-------- groovy 中的示例

public class Runner{


final PipedOutputStream source = new PipedOutputStream();
PipedInputStream sink = new PipedInputStream();

public static void main(String[] args) {
    new Runner().doit()
    println "Finished main thread"
}


public void doit() {

    sink.connect(source)

    (new Producer(source)).start()
    BufferedInputStream buffer = new BufferedInputStream(sink)
    (new Consumer(buffer)).start()
}
}

class Producer extends Thread {


OutputStream source
Producer(OutputStream source) {
    this.source=source
}

@Override
public void run() {

    byte[] data = new byte[1024];

    println "Running the Producer..."
    FileInputStream fout = new FileInputStream("/Users/ganesh/temp/www/README")

    int amount=0
    while((amount=fout.read(data))>0)
    {
        String s = new String(data, 0, amount);
        source.write(s.getBytes())
        synchronized (this) {
            wait(5);
        }
    }

    source.close()
}

}

class Consumer extends Thread{

InputStream ins

Consumer(InputStream ins)
{
    this.ins = ins
}

public void run()
{
    println "Consumer running"

    int amount;
    byte[] data = new byte[1024];
    while ((amount = ins.read(data)) >= 0) {
        String s = new String(data, 0, amount);
        println "< $s"
        synchronized (this) {
            wait(5);
        }
    }

}

}

【讨论】:

  • sink.close() 错过了吗?
  • 从 Java 6 开始,您可以为 PipedInputStream​ 指定自定义管道大小
【解决方案3】:

一个缺陷可能是作者没有明确的方式向读者表明它遇到了问题:

PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);

new Thread(() -> {
    try {
        writeToOut(out);
        out.close();
    }
    catch (SomeDataProviderException e) {
        // Have to notify the reading side, but how?
    }
}).start();

readFromIn(in);

作者可以关闭out,但可能读者将其误解为数据结束。为了正确处理这个问题,需要额外的逻辑。如果提供手动断开管道的功能会更容易。

现在有JDK-8222924 请求手动断开管道的方法。

【讨论】:

    【解决方案4】:

    在我看来有一个缺陷。更准确地说,如果应该将数据泵入 PipedOutputStream 的线程在实际将单个字节写入流中之前过早死亡,则死锁的风险很高。这种情况下的问题是管道流的实现无法检测到损坏的管道。因此,从 PipedInputStream 读取的线程将在第一次调用 read() 时永远等待(即死锁)。

    断管检测实际上依赖于对 write() 的第一次调用,因为实现会延迟初始化写入端线程,并且只有从那个时间点开始,断管检测才会起作用。

    下面的代码重现了这种情况:

    import java.io.IOException;
    import java.io.PipedInputStream;
    import java.io.PipedOutputStream;
    
    import org.junit.Test;
    
    public class PipeTest
    {
        @Test
        public void test() throws IOException
        {
            final PipedOutputStream pout = new PipedOutputStream();
            PipedInputStream pin = new PipedInputStream();
    
            pout.connect(pin);
    
            Thread t = new Thread(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        if(true)
                        {
                            throw new IOException("asd");
                        }
                        pout.write(0); // first byte which never get's written
                        pout.close();
                    }
                    catch(IOException e)
                    {
                        throw new RuntimeException(e);
                    }
                }
            });
            t.start();
    
            pin.read(); // wait's forever, e.g. deadlocks
        }
    }
    

    【讨论】:

    • 您希望写入和读取操作在独立线程中进行。为了让消费者不会因为竞争条件而立即退出,最好在执行读取操作之前执行 pin.available() 检查以查看它是否大于 0。然后您可以重复执行可用和读取操作,直到再次可用 == 0。如果作者断开连接,您还希望对读取端进行异常处理。
    • 此测试代码在出现异常时无法关闭输出流。如果你用try (OutputStream pout_closed = pout) { ... }包围你的try块的内容,代码不会死锁。
    • @RobertCasey 您仍然必须等到available() 返回> 0,这与直接调用read() 相同。此外,当read() 调用上的读取线程已被阻塞时,您实际上无法进行太多异常处理。
    • @AlexQ 关闭管道将向读者表明已到达数据末尾。它可能会错误地处理这种情况,您需要额外的逻辑来确定是否实际到达终点或作者是否遇到问题。
    【解决方案5】:

    我在 JDK 实现中看到的缺陷是:

    1) 没有超时,读写器可以无限阻塞。

    2) 对何时传输数据的次优控制(应仅在刷新或循环缓冲区已满时进行)

    所以我创建了自己的来解决上述问题,(通过 ThreadLocal 传递的超时值):

    PipedOutputStream

    使用方法:

    PiedOutputStreamTest

    希望对你有帮助...

    【讨论】:

      猜你喜欢
      • 2023-04-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-06
      • 1970-01-01
      • 2017-10-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多