【问题标题】:Java IO: Reading a file that is still being writtenJava IO:读取仍在写入的文件
【发布时间】:2015-06-18 08:42:35
【问题描述】:

我正在创建一个需要从仍在写入的文件中读取的程序。

主要问题是:如果读取和写入将使用在单独线程上运行的 InputStreamOutputStream 类执行,我需要注意哪些问题和边缘情况为了防止数据损坏?

如果有人想知道我是否考虑过其他不基于InputStream 的方法,答案是肯定的,我有,但不幸的是,在这个项目中这是不可能的,因为该程序使用的库仅适用于InputStreamOutputStream.

另外,一些读者问为什么这种复杂性是必要的。文件写完后为什么不读呢?

原因是效率。该程序将执行以下操作

  1. 下载一系列每个 1.5MB 的字节块。该程序将收到数千个这样的块,总大小可达 30GB。此外,为了最大化带宽,块会同时下载,因此它们可能会乱序到达
  2. 程序将在每个块到达后立即发送它们进行处理。请注意,它们将按顺序送去处理。如果块 m 在块 m-1 之前到达,它们将被缓冲在磁盘上,直到块 m-1 到达并被发送处理。
  3. 从块 0 到块 n 执行这些块的处理,直到处理完每个块
  4. 重新发回处理后的结果。

如果我们要等待整个文件传输完毕,将会给本应是实时系统的系统带来巨大的延迟。

【问题讨论】:

  • 我更想知道您为什么考虑同时读取和写入同一个文件。当然可以,但它会打开一堆潜在问题的蠕虫。
  • Jägermeister 是对的。仅仅因为库与 Input/OutputStreams 一起使用并不意味着您需要同时读取和写入文件。
  • @Kayaman 请查看我更新的帖子。
  • @Jägermeister 请查看我更新的帖子。另外,这些潜在问题究竟是什么?
  • @lolski 在内存中进行此处理是否有意义?由于您显然不需要在开始处理之前拥有整个 30GB,因此您似乎可以分块处理数据。如果库仅使用流,您始终可以创建自己合适的实现。您的示例代码并没有真正提供太多信息,因此很难提出一个体面的建议。

标签: java scala inputstream outputstream


【解决方案1】:

使用 RandomAccessFile。通过 getChannel 或这样的方法可以使用 ByteBuffer

您将无法“插入”或“删除”文件的中间部分。为此目的,您的原始方法会很好,但使用两个文件。

对于并发:为了保持同步,您可以维护文件的单个对象模型,在那里进行更改。只有待处理的更改需要保存在内存中,其他分层数据可以根据需要重新读取和重新解析。

【讨论】:

  • 我会调查的。是否有任何我们需要注意的文件系统特定问题?
  • 我一般会检查修改时间是否有外部干扰(比如启动一个应用两次)。 WatchService 接收这样的外部修改是一个不错的加分项,但不要认为 WatchService 是完美的:并非所有的更改方式都可能被检测到,例如通过剪贴板。在你获得一个基本的工作应用程序后,你可以考虑每个操作系统的文件属性、锁等。
【解决方案2】:

所以你的问题(正如你现在已经清除的那样)是在块#1 到达之前你不能开始处理,你需要缓冲每个块#N (N > 1) 直到你可以处理它们.

我会将每个块写入它们自己的文件并创建一个自定义InputStream,它将按顺序读取每个块。下载块文件时会被命名为chunk.1.downloading,当整个块被加载时,它将被重命名为chunk.1

自定义InputStream 将检查文件chunk.N 是否存在(其中N = 1...X)。如果没有,它将阻塞。每次一个块被完全下载后,InputStream 就会被通知,它会检查下载的块是否是下一个要处理的块。如果是,则正常读取,否则再次阻塞。

【讨论】:

  • 这种方法绝对有意义。但是,为了清楚起见,您能否具体说明为什么这种方法比我提出的方法更可取,即是什么因素导致上述“边读边写”操作不受欢迎?
  • 一个明显的优势是,在发生崩溃的情况下,您可以清楚地看到系统的状态,然后恢复运行。我可以想象将这些块保存为它们自己的 S3 文件也会有一些优势。这绝对是一个比试图让 2 个流相互追逐更直接的实现。
【解决方案3】:

你应该使用 PipedInputStream 和 PipedOutputStream:

static Thread newCopyThread(InputStream is, OutputStream os) {
    Thread t = new Thread() {
        @Override
        public void run() {
            byte[] buffer = new byte[2048];
            try {
                while (true) {
                    int size = is.read(buffer);
                    if (size < 0) break;
                    os.write(buffer, 0, size);
                }
                is.close();
                os.close();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
            }
        }
    };
    return t;
}

public void main(String[] args) throws IOException, InterruptedException {
    ByteArrayInputStream bi = new ByteArrayInputStream("abcdefg".getBytes());
    PipedInputStream is = new PipedInputStream();
    PipedOutputStream os = new PipedOutputStream(is);
    Thread p = newCopyThread(bi, os);
    Thread c = newCopyThread(is, System.out);
    p.start();
    c.start();
    p.join();
    c.join();
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-09-30
    • 2012-04-09
    • 1970-01-01
    • 2011-07-27
    • 2015-05-29
    • 2011-11-21
    • 1970-01-01
    • 2014-03-20
    相关资源
    最近更新 更多