【问题标题】:Reading first N bytes of a file as an InputStream in Java?在Java中将文件的前N个字节作为InputStream读取?
【发布时间】:2015-01-23 21:47:46
【问题描述】:

在我的一生中,我一直无法找到与我正在尝试做的事情相匹配的问题,所以我将在这里解释我的用例。如果您知道某个主题已经涵盖了此问题的答案,请随时将我引向该主题。 :)

我有一段代码定期(每 20 秒)将文件上传到 Amazon S3。该文件是由另一个进程写入的日志文件,因此此功能实际上是一种跟踪日志的方法,以便某人可以半实时读取其内容,而无需直接访问日志所在的机器.

直到最近,我一直只是使用 S3 PutObject 方法(使用文件作为输入)来执行此上传。但在 AWS SDK 1.9 中,这不再有效,因为如果实际上传的内容大小大于在上传开始时承诺的内容长度,S3 客户端会拒绝请求。此方法在开始流式传输数据之前读取文件的大小,因此鉴于此应用程序的性质,文件的大小很可能在该点和流结束之间增加。这意味着我现在需要确保无论文件有多大,我只发送 N 个字节的数据。

我不需要以任何方式解释文件中的字节,所以我不关心编码。我可以逐字节传输它。基本上,我想要的是一种简单的方法,我可以将文件读取到第 N 个字节,然后让它终止读取,即使文件中有更多数据超过该点也是如此。 (换句话说,在特定点将 EOF 插入流中。)

例如,如果我的文件在开始上传时为 10000 字节长,但在上传期间增长到 12000 字节,那么无论大小发生什么变化,我都希望在 10000 字节时停止上传。 (在随后的上传中,我会上传 12000 字节或更多。)

我还没有找到一个预制的方法来做到这一点 - 到目前为止我发现的最好的方法似乎是 IOUtils.copyLarge(InputStream, OutputStream, offset, length),它可以被告知最多复制提供的 OutputStream 的“长度”字节。但是,copyLarge 是一种阻塞方法,PutObject 也是一种阻塞方法(大概在其 InputStream 上调用了一种 read() 形式),所以我似乎根本无法让它工作。

我还没有找到任何方法或预构建的流可以做到这一点,所以这让我觉得我需要编写自己的实现来直接监控已读取的字节数。这可能会像 BufferedInputStream 一样工作,其中每批读取的字节数是缓冲区大小或要读取的剩余字节中的较小者。 (例如,缓冲区大小为 3000 字节,我会分三批,每批 3000 字节,然后是 1000 字节 + EOF 的批次。)

有人知道更好的方法吗?谢谢。

编辑澄清一下,我已经知道几个替代方案,但都不是理想的:

(1) 我可以在上传文件时锁定文件。这样做会导致数据丢失或在写入文件的过程中出现操作问题。

(2) 我可以在上传之前创建文件的本地副本。这可能会非常低效并占用大量不必要的磁盘空间(此文件可能会增长到几 GB 的范围,并且运行它的机器可能磁盘空间不足)。

编辑 2:根据同事的建议,我的最终解决方案如下所示:

private void uploadLogFile(final File logFile) {
    if (logFile.exists()) {
        long byteLength = logFile.length();
        try (
            FileInputStream fileStream = new FileInputStream(logFile);
            InputStream limitStream = ByteStreams.limit(fileStream, byteLength);
        ) {
            ObjectMetadata md = new ObjectMetadata();
            md.setContentLength(byteLength);
            // Set other metadata as appropriate.
            PutObjectRequest req = new PutObjectRequest(bucket, key, limitStream, md);
            s3Client.putObject(req);
        } // plus exception handling
    }
}

LimitInputStream 是我的同事建议的,显然不知道它已被弃用。 ByteStreams.limit 是当前的 Guava 替代品,它可以满足我的需求。谢谢大家。

【问题讨论】:

  • 为什么执行阻塞 I/O 会出现问题?尤其是考虑到你以前就是这样做的?
  • 您可以只用很少的代码行扩展FilterInputStream,使其在读取最多 N 个字节后假装存在 EOF 条件。
  • @5gon12eder 扩展 FilterInputStream 或其子类之一是否有意义,例如。缓冲输入流?这种方法听起来像是我倾向于的方法。
  • 不,实际上,FilterInputStream 对这项特定的工作根本没有帮助,因为要正确地完成它,无论如何您都必须重写几乎所有的 InputStream 方法。覆盖java.io.InputStream 本身并不难。
  • 您从未想过编写十几行代码来实现这一点的可能性?

标签: java inputstream filestream


【解决方案1】:

完整答案 rip & replace:

包装InputStream 相对简单,例如在发出数据结束信号之前限制它将传递的字节数。 FilterInputStream 是针对这种一般类型的工作,但由于您必须为这个特定 工作重写几乎所有方法,它只是妨碍了。

这是一个粗略的解决方案:

import java.io.IOException;
import java.io.InputStream;

/**
 * An {@code InputStream} wrapper that provides up to a maximum number of
 * bytes from the underlying stream.  Does not support mark/reset, even
 * when the wrapped stream does, and does not perform any buffering.
 */
public class BoundedInputStream extends InputStream {

    /** This stream's underlying @{code InputStream} */
    private final InputStream data;

    /** The maximum number of bytes still available from this stream */ 
    private long bytesRemaining;

    /**
     * Initializes a new {@code BoundedInputStream} with the specified
     * underlying stream and byte limit
     * @param data the @{code InputStream} serving as the source of this
     *        one's data
     * @param maxBytes the maximum number of bytes this stream will deliver
     *        before signaling end-of-data
     */
    public BoundedInputStream(InputStream data, long maxBytes) {
        this.data = data;
        bytesRemaining = Math.max(maxBytes, 0);
    }

    @Override
    public int available() throws IOException {
        return (int) Math.min(data.available(), bytesRemaining);
    }

    @Override
    public void close() throws IOException {
        data.close();
    }

    @Override
    public synchronized void mark(int limit) {
        // does nothing
    }

    @Override
    public boolean markSupported() {
        return false;
    }

    @Override
    public int read(byte[] buf, int off, int len) throws IOException {
        if (bytesRemaining > 0) {
            int nRead = data.read(
                    buf, off, (int) Math.min(len, bytesRemaining));

            bytesRemaining -= nRead;

            return nRead;
        } else {
            return -1;
        }
    }

    @Override
    public int read(byte[] buf) throws IOException {
        return this.read(buf, 0, buf.length);
    }

    @Override
    public synchronized void reset() throws IOException {
        throw new IOException("reset() not supported");
    }

    @Override
    public long skip(long n) throws IOException {
        long skipped = data.skip(Math.min(n, bytesRemaining));

        bytesRemaining -= skipped;

        return skipped;
    }

    @Override
    public int read() throws IOException {
        if (bytesRemaining > 0) {
            int c = data.read();

            if (c >= 0) {
                bytesRemaining -= 1;
            }

            return c;
        } else {
            return -1;
        }
    }
}

【讨论】:

  • 所以,这可以正常工作(并且 IOUtils 提供了几乎完全相同的实现),除了 S3 PutObject 方法没有让我对如何读取流进行任何控制。它大概只是调用 InputStream.read() 直到流返回 EOF。所以我需要一种控制发送给它的数据的方法。但是你那里的一部分代码看起来就像我已经在考虑做的那样 - 我主要只是想确保我没有重新发明轮子。
  • 所以我显然误解了你。我以为您正在寻找PutObject替代方案,还有其他方法可以获取适当的OutputStream 以将字节定向到该位置。对于要插入PutObject 的东西,您最好的选择可能是@5gon12eder 建议的方法。或许我可以提供一些细节。
  • AFAIK,我仅限于使用 SDK 中的 PutObject 方法,或者编写我自己的客户端以更直接地流式传输到服务器。后者肯定是在重新发明轮子,也意味着上传过程的其他部分,即在输出文件时加密文件,也必须重新实现。不,我真的只是想控制通过 PutObject 发送的数据量。听起来我需要一个自定义的 InputStream 来做到这一点,尽管基于 FilterInputStream 或 BufferedInputStream 应该会相对容易。
  • 为此目的自定义InputStream无论如何都相对容易。请参阅我修改后的答案。
  • 感谢修改后的答案。这看起来简单明了——我会在下周回到我的办公桌时尝试一下,让你知道它是如何工作的。 :)
猜你喜欢
  • 2013-08-14
  • 1970-01-01
  • 1970-01-01
  • 2018-06-28
  • 2020-06-12
  • 2015-07-11
  • 1970-01-01
  • 2012-12-25
  • 2017-12-12
相关资源
最近更新 更多