【问题标题】:Writing to a file from multiple threads without lock从多个线程无锁写入文件
【发布时间】:2017-04-04 22:15:13
【问题描述】:

我需要将数据缓冲区逐个缓冲区写入来自不同线程的文件。为了避免锁定我正在写入不同的文件,比如'file_1','file_2',最后将所有这些文件合并到'file'。这种方法好吗?有没有更好的建议?

有些文件非常大,包含数千个缓冲区。因此,创建了数千个临时文件,然后合并和清理。

【问题讨论】:

  • 数千个临时文件使用数千个线程?或者...
  • 那么,为避免锁定,您现在有 2 或 3 倍的 I/O 吗?
  • 为什么要避免锁?
  • 我正在做一些分析以提高性能并避免争用。

标签: c# multithreading task-parallel-library .net-4.5


【解决方案1】:

我的直觉是按摩文件会很昂贵,而且管理数千个文件听起来很复杂且容易出错。

不如让一个专门的线程来写。其他线程只是将它们的消息添加到等待写入的队列中。虽然会有一些同步开销,但在锁中所做的实际工作非常少,只需将“指针”复制到消息到队列中即可。由于打开文件并写入文件可能比使用互斥锁更昂贵,因此您实际上可能会提高性能。

【讨论】:

  • 是的,我也考虑过这种方法。现在就试试。
【解决方案2】:

这是一个示例方法(没有错误处理!),展示了如何使用BlockingCollection 来管理要写入文件的缓冲区队列。

这个想法是你创建一个ParallelFileWriter,然后在所有想要写入文件的线程中使用它。完成后,只需释放它(但请确保在所有线程都完成写入之前不要释放它!)。

这只是一个让您入门的简单示例 - 您需要添加参数检查和错误处理:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class ParallelFileWriter: IDisposable
    {
        // maxQueueSize is the maximum number of buffers you want in the queue at once.
        // If this value is reached, any threads calling Write() will block until there's
        // room in the queue.

        public ParallelFileWriter(string filename, int maxQueueSize)
        {
            _stream     = new FileStream(filename, FileMode.Create);
            _queue      = new BlockingCollection<byte[]>(maxQueueSize);
            _writerTask = Task.Run(() => writerTask());
        }

        public void Write(byte[] data)
        {
            _queue.Add(data);
        }

        public void Dispose()
        {            
            _queue.CompleteAdding();
            _writerTask.Wait();
            _stream.Close();
        }

        private void writerTask()
        {
            foreach (var data in _queue.GetConsumingEnumerable())
            {
                Debug.WriteLine("Queue size = {0}", _queue.Count);
                _stream.Write(data, 0, data.Length);
            }
        }

        private readonly Task _writerTask;
        private readonly BlockingCollection<byte[]> _queue;
        private readonly FileStream _stream;
    }

    class Program
    {
        private void run()
        {
            // For demo purposes, cancel after a couple of seconds.

            using (var fileWriter = new ParallelFileWriter(@"C:\TEST\TEST.DATA", 100))
            using (var cancellationSource = new CancellationTokenSource(2000))
            {
                const int NUM_THREADS = 8;
                Action[] actions = new Action[NUM_THREADS];

                for (int i = 0; i < NUM_THREADS; ++i)
                {
                    int id = i;
                    actions[i] = () => writer(cancellationSource.Token, fileWriter, id);
                }

                Parallel.Invoke(actions);
            }
        }

        private void writer(CancellationToken cancellation, ParallelFileWriter fileWriter, int id)
        {
            int index = 0;

            while (!cancellation.IsCancellationRequested)
            {
                string text = string.Format("{0}:{1}\n", id, index++);
                byte[] data = Encoding.UTF8.GetBytes(text);
                fileWriter.Write(data);
            }
        }

        static void Main(string[] args)
        {
            new Program().run();
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-19
    • 2015-08-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多