【问题标题】:Multiple operations on asynchronous stream异步流上的多个操作
【发布时间】:2013-01-07 15:56:10
【问题描述】:

我正在接收一个要保存到光盘的文件,它具有最高优先级。但我想用其他两个操作“拆分”/“共享”这个流。

到目前为止,我的方法是拥有一个 MainStream,它可以创建从 MainStream 中的缓冲区读取的子流。

如果这是一种合适的方法,我需要一些方法来确定子流在流中的位置。我怎样才能做到这一点?
或者它是解决我的主要问题的更好方法吗?

【问题讨论】:

  • 我怀疑有更好的方法,但我不清楚你的瓶颈是什么。是您的磁盘驱动器让您变慢,还是 CPU 用于生成输出或您想要避免的系统调用?
  • 您是否需要在文件更新时能够读取流,或者您可以在之后读取文件内容?如果其他消费者在短期内无法跟上您获取数据的速度怎么办?
  • 我没有进行任何测量。但是我想要完成的不是将文件写入光盘然后再读取它。所以现在我检索文件并将其保存到光盘,然后我执行我的操作。我想做这些操作,同时将数据保存到磁盘。
  • 除非您知道自己的问题是什么,否则您实施的任何解决方案都可能使问题变得更糟而不是更好。你有 a) 资格和衡量你的问题是什么 b) 创建一个解决方案来解决这个问题。 c) 重新测试它以确保它改善了您的问题。创建一个没有问题的解决方案只会增加复杂性而无济于事。
  • 好吧。我明白你在说什么。但问题对我来说很清楚。我不想在不需要时从光盘读取。数据已经通过内存一次,我想在第一次进入内存时读取数据。

标签: java asynchronous stream inputstream


【解决方案1】:

如果I/O不是你的瓶颈,你可以使用多线程写文件。

下面的代码只是一个例子:

/**
 * @author lichengwu
 * @version 1.0
 * @created 2013-01-08 12:11 AM
 */
public class MultiWrite {

private static final int SIZE = 1024 * 1024 * 1024;

ExecutorService exec = Executors.newFixedThreadPool(5);

public void start() {
    final File source = new File("");
    long size = source.length();
    final File store = new File("");

    for (long position = 0; position < size; position = position + SIZE) {
        exec.execute(new WriteTask(source, store, position));
    }

}

public class WriteTask implements Runnable {

    private final File store;

    private final File source;

    private final long position;

    public WriteTask(File source, File store, long position) {
        this.store = store;
        this.position = position;
        this.source = source;
    }

    public void run() {
        try {

            RandomAccessFile in = new RandomAccessFile(source, "r");

            // lock part of store
            RandomAccessFile out = new RandomAccessFile(store, "rw");
            FileChannel channel = out.getChannel();
            FileLock lock;
            while (true) {
                try {
                    lock = channel.tryLock(position, SIZE, false);
                    break;
                } catch (Exception e) {
                    // deal with
                }

            }

            out.seek(position);
            in.seek(position);
            byte[] data = new byte[SIZE];
            in.read(data);
            out.write(data);
            // release
            lock.release();
            channel.close();
            out.close();
            in.close();
        } catch (IOException e) {
            // deal with
        }
    }
}
}

【讨论】:

    猜你喜欢
    • 2015-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-16
    • 1970-01-01
    • 1970-01-01
    • 2020-01-18
    • 2015-11-13
    相关资源
    最近更新 更多