【问题标题】:Standard term for a thread I/O reorder buffer?线程 I/O 重新排序缓冲区的标准术语?
【发布时间】:2010-05-28 21:46:04
【问题描述】:

我有一个案例,其中许多线程都同时生成数据,这些数据最终写入一个长的串行 file 流。我需要以某种方式序列化这些写入,以便以正确的顺序写入流。

ie,我有一个 2048 个作业的输入队列 j0..jn,每个作业都会产生一大块数据 o 。作业在 8 个线程上并行运行,但输出块必须以与相应输入块相同的顺序出现在流中 - 输出文件的顺序必须为 o0 o1o2...

这个问题的解决方案是不言而喻的:我需要某种缓冲区以正确的顺序累积和写入输出块,类似于Tomasulo's algorithm 中的 CPU 重新排序缓冲区,或者类似于 TCP 重组输出的方式 -在将它们传递到应用程序层之前的无序数据包。

在开始编写代码之前,我想快速搜索一下文献,看看是否有任何论文以特别聪明或有效的方式解决了这个问题,因为我有严重的实时和内存限制。不过,我似乎找不到任何描述这一点的论文;对 [threads, concurrent, reorder buffer, reassembly, io, serialize] 的每个排列进行学术搜索并没有产生任何有用的信息。我觉得我一定不是在搜索正确的术语。

我可以搜索这种模式的通用学术名称或关键字吗?

【问题讨论】:

    标签: algorithm concurrency computer-science


    【解决方案1】:

    Enterprise Integration Patterns 书称其为 Resequencer (p282/web)。

    【讨论】:

      【解决方案2】:

      实际上,您不需要累积块。大多数操作系统和语言都提供随机访问文件抽象,允许每个线程独立地将其输出数据写入文件中的正确位置,而不会影响任何其他线程的输出数据。

      或者您正在写入真正的串行输出文件,如套接字?

      【讨论】:

      • 真正的串行——流密码。
      • 只有在处理完成之前知道输出记录的长度时,您的解决方案才有效。
      【解决方案3】:

      就个人而言,我根本不会使用可重新排序的缓冲区。我会为每个作业创建一个“作业”对象,并且根据您的环境,使用消息传递或互斥锁按顺序从每个作业接收完成的数据。如果下一个工作没有完成,你的“作家”进程会一直等到它完成。

      【讨论】:

      • 恐怕我不明白你的意思。你的意思是我应该有(n)多个互斥锁,每个工作一个,并且作者应该按升序等待每个互斥锁?这样做的问题是我的内存一次只能保存大约 20 个作业,如果我遇到当前窗口恰好以相反顺序完成的情况,这将使几个线程处于空闲状态,直到“头”一个完成。
      • 这就是我的建议,是的。如果任务以相反的顺序完成,我认为没有任何其他解决方案会做得更好,除了史蒂夫的建议,如果你的记录是已知长度的,或者将完成的结果缓存到磁盘。
      【解决方案4】:

      我会使用与您正在使用的线程数具有相同长度的环形缓冲区。环形缓冲区也将具有相同数量的互斥锁。

      rinbuffer 还必须知道它写入文件的最后一个块的 id。它相当于你的 ringbuffer 的 0 索引。

      在添加到环形缓冲区时,检查是否可以写入,即设置了索引 0,然后您可以一次将多个块写入文件。

      如果未设置索引 0,则只需锁定当前线程等待。 -- 你也可以有一个比你的线程数多 2-3 倍的环形缓冲区,并且只在适当的时候锁定,即:当启动了足够多的作业来填满缓冲区时。

      不要忘记更新最后写的代码块;)

      您也可以在写入文件时使用双缓冲。

      【讨论】:

        【解决方案5】:

        让输出队列包含 期货 而不是实际数据。当您从输入队列中检索项目时,立即将相应的未来发布到输出队列(注意确保这保留了顺序 --- 见下文)。当工作线程处理完项目后,它可以设置未来的值。输出线程可以从队列中读取每个未来,并阻塞直到该未来准备好。如果后面的线程提前准备好,这根本不会影响输出线程,前提是未来是有序的。

        有两种方法可以确保输出队列中的期货顺序正确。第一种是使用单个互斥体从输入队列读取并写入输出队列。每个线程锁定互斥体,从输入队列中取出一个项目,将未来发布到输出队列并释放互斥体。

        第二个是有一个主线程从输入队列中读取,将未来发布到输出队列,然后将项目交给工作线程执行。

        在 C++ 中,使用单个互斥锁保护队列,如下所示:

        #include <thread>
        #include <mutex>
        #include <future>
        
        struct work_data{};
        struct result_data{};
        
        std::mutex queue_mutex;
        std::queue<work_data> input_queue;
        std::queue<std::future<result_data> > output_queue;
        
        result_data process(work_data const&); // do the actual work
        
        void worker_thread()
        {
            for(;;) // substitute an appropriate termination condition
            {
                std::promise<result_data> p;
                work_data data;
                {
                    std::lock_guard<std::mutex> lk(queue_mutex);
                    if(input_queue.empty())
                    {
                        continue;
                    }
                    data=input_queue.front();
                    input_queue.pop();
                    std::promise<result_data> item_promise;
                    output_queue.push(item_promise.get_future());
                    p=std::move(item_promise);
                }
                p.set_value(process(data));
            }
        }
        
        void write(result_data const&); // write the result to the output stream
        
        void output_thread()
        {
            for(;;) // or whatever termination condition
            {
                std::future<result_data> f;
                {
                    std::lock_guard<std::mutex> lk(queue_mutex);
                    if(output_queue.empty())
                    {
                        continue;
                    }
                    f=std::move(output_queue.front());
                    output_queue.pop();
                }
                write(f.get());
            }
        }
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2017-01-10
          • 2015-08-20
          • 2015-07-05
          • 1970-01-01
          • 1970-01-01
          • 2014-09-20
          • 2012-01-14
          • 1970-01-01
          相关资源
          最近更新 更多