【问题标题】:An approach to multithreaded file processing一种多线程文件处理的方法
【发布时间】:2016-01-29 09:43:54
【问题描述】:

我有一个相当大的文件(> 15 GB)(不管什么样的文件)。 我必须读取文件,对数据进行一些处理,然后将处理后的数据写入空白文件。 我分块做。每个块都包含某种标题,然后是数据。最简单的多块文件将包含:

Number of block bytes
Block bytes
Number of block bytes
Block bytes

因此,我创建了一个线程用于逐块读取文件,一些线程用于处理每个读取的块,并创建一个线程用于逐块写入处理过的数据。

我在管理这些线程时遇到了一些问题。

我不知道每个块的处理顺序,但我必须按照读取的顺序将块写入文件。

所以,我的问题是我必须使用哪种方法来管理多线程处理。

我想,如果我使用 producer concumer 模式可能会更好。在这种情况下,最好使用哪种数据结构来存储已经处理过的数据?我有一个猜测 - 基于数组的堆栈,我需要在开始写入之前排序一次。

但我不确定。所以,请帮我一个方法。

//sample of my code, but without any logic of threads managing

public class DataBlock
{
    public byte[] Data { get; }
    public long Index { get; }

    public DataBlock(byte[] data, long index)
    {
        this.Data = data;
        this.Index = index;
    }
}


int bufferSize = 1024*64; //65536
long processedBlockCounter = 0L;
MyStack<DataBlock> processedBlockStore = new MyStack<DataBlock>();

using (FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize))
{
    using (BufferedStream bs = new BufferedStream(fs, bufferSize))
    {
        byte[] buffer = new byte[bufferSize];
        int byteRead;
        while ((byteRead = bs.Read(buffer, 0, bufferSize)) > 0)
        {
            byte[] originalBytes;
            using (MemoryStream mStream = new MemoryStream())
            {
                mStream.Write(buffer, 0, byteRead);
                originalBytes = mStream.ToArray();
            }

            long dataBlockIndex = Interlocked.Increment(ref processedBlockCounter);

            Thread processThread = new Thread(() =>
            {
                byte[] processedBytes = MyProcessor.Process(originalBytes);
                DataBlock processedBlock = new DataBlock(processedBytes, processedBlockCounter);
                lock(processedBlockStore)
                {
                     processedBlockStore.Add(processedBlock);
                }
            });
            processThread.Start();
        }
    }
}

【问题讨论】:

  • 是否可以选择将 index 添加到用于将块数据保存在内存中的结构中?读取器将按顺序填充它,写入器将等待 right 下一个块写入。作为数据结构,我只需使用阻塞排序集合。
  • @AdrianoRepetti 我的处理没有任何顺序就完成了,所以我需要在写入之前进行排序,这就是我存储索引的原因。从阻塞排序集合中添加和删除有什么复杂性?每次添加后是否对元素进行排序?
  • 您的处理耗时吗?还是会很快完成?
  • 这取决于它是如何实现的。一般来说我会使用一个单链表,但这取决于具体的时间场景。即使是未排序的集合 + 用于存储最新收到的数据包索引的支持变量也可能有效(在这种情况下,您的插入性能更好,但写入性能更差,顺便说一句,涉及 I/O 我不在乎)。
  • @SriramSakthivel 当然,如果处理很快完成会更好 =)

标签: .net multithreading stream thread-safety producer-consumer


【解决方案1】:

您正在为每次迭代创建新线程。这不会扩大规模。我建议您改用 ThreadPool。首选方式是使用内部使用 ThreadPool 的 TPL。

由于您需要排序和并行处理,而且它们不能齐头并进,如果可以的话,您可以让您的代码完全同步。

如果您需要并行处理,鉴于您的文件大于 15 GB 并且您的处理也很耗时,我建议您使用以下 Fork-Join 策略。

  • 对您的文件进行分块
  • 用每个块启动一个任务
  • 让每个任务将输出写入一个名为 index.html 的临时文件。 1.txt2.txt
  • 等待所有任务完成
  • 最后读取这些临时文件并按顺序创建输出文件。
  • 当然要删除那些临时文件。你完成了。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-11
  • 1970-01-01
相关资源
最近更新 更多