【问题标题】:Parsing large file with MPI in C++在 C++ 中使用 MPI 解析大文件
【发布时间】:2014-12-10 10:04:26
【问题描述】:

我有一个 C++ 程序,我想在其中解析一个巨大的文件,寻找一些我已经实现的正则表达式。该程序在按顺序执行时运行正常,但后来我想使用 MPI 运行它。

我通过将 master(协调执行的那个)与 workers(并行解析文件的那个)区分开来开始适应 MPI。 主要功能。像这样的:

MPI::Init(argc, argv);
...

if(rank == 0) {
    ...

    // Master sends initial and ending byte to every worker
    for(int i = 1; i < total_workers; i++) {
        array[0] = (i-1) * first_worker_file_part;
        array[1] = i * first_worker_file_part;
        MPI::COMM_WORLD.Send(array, 2, MPI::INT, i, 1);
    }
}

if(rank != 0)
    readDocument();

...

MPI::Finalize();

master 将向每个 worker 发送一个具有 2 个位置的数组,其中包含它将开始读取位置 0 中的文件的字节strong> 以及需要在 1 位置停​​止读取的字节。

readDocument() 函数现在看起来像这样(不是解析,只是每个 worker 读取他的文件部分):

void readDocument()
{
    array = new int[2];
    MPI::COMM_WORLD.Recv(array, 10, MPI::INT, 0, 1, status);
    int read_length = array[1] - array[0];
    char* buffer = new char [read_length];

    if (infile)
    {
        infile.seekg(array[0]); // Start reading in supposed byte
        infile.read(buffer, read_length);
    }
}

我尝试了不同的示例,从将读取的输出写入文件到使用不同数量的进程运行它。例如,当我使用 20 个进程而不是 10 个进程运行程序时,会发生两次读取文件的时间。我预计会是将近一半的时间,但我不知道为什么会这样。

另外,在另一件事上,我想让 master 等待所有 workers 完成他们的执行,然后打印最后的时间。有没有办法在工人处理时“阻止”他?就像 C pthreads 中的 cond_wait

【问题讨论】:

  • 您是否将文件放在并行文件系统上?否则所有读取都将有效地全局锁定。慢 2 倍可能有点奇怪,但如果你真的耗尽资源,它可能会发生。
  • @luk32:我在本地运行程序,所有进程都在我的计算机中。该文件与可执行文件存储在同一文件夹中。有没有办法一次只由一个进程读取文件?如果是,我该如何解决?
  • @HighPerformanceMark 我不太明白这个问题,对不起。我如何知道从进程到磁盘有多少通道?
  • @gd.silva 好吧,简而言之,你不能,至少不容易。您是否同时访问文件甚至不是每个进程都无关紧要,因为如果它只是读取您肯定会耗尽HDD CPU带宽,并且添加进程只会增加开销。我的意思是,如果所有东西都在一个 HDD 上,那么它只有一个管道,操作系统必须管理对它的许多读取请求。
  • 至于您的第二个问题,这与我提出的另一个问题完全不同——您为什么要这样做,让流程按照自己的甜蜜方式进行并获得最后一次有什么问题在你打电话给mpi_finalize 之后?

标签: c++ file mpi


【解决方案1】:

根据我的经验,在具有并行文件系统的计算机系统上工作的人往往了解这些并行文件系统,因此您的问题最初会将您标记为不使用此类系统的人。

如果没有特定的硬件支持,从单个文件读取归结为系统定位单个读取头并将一系列字节从磁盘读取到内存。许多现代文件系统(例如 RAID)的复杂现实并没有实质性地改变这种情况,RAID 实际上可以跨多个磁盘存储文件。当多个进程同时请求操作系统访问文件时,操作系统会根据某种概念(可能是公平的)分配磁盘访问权限,这样就不会出现任何进程被饿死的情况。在最坏的情况下,操作系统会花费大量时间在进程之间切换磁盘访问,以致读取率显着下降。就吞吐量而言,最有效的方法是单个进程一次读取整个文件,而其他进程执行其他操作。

这种情况,即多个进程争用稀缺的磁盘 i/o 资源,适用于这些进程是否属于并行、MPI(或类似)程序或同时运行的完全独立的程序。

影响就是您观察到的 - 而不是 10 个进程各自等待获得自己的 1/10 共享文件,而是有 20 个进程各自等待它们的 1/20 共享。哦,你哭了,但每个进程只读取一半的数据,所以整个团队应该花费相同的时间来获取文件。不,我回答说,您忘记添加操作系统在访问之间定位和重新定位读/写头所需的时间。读取时间包括延迟(发出请求后开始读取需要多长时间)和吞吐量(i/o 系统来回传递字节的速度)。

应该很容易对延迟和带宽做出一些合理的估计,这可以解释 20 个进程的读取时间是 10 个进程的两倍。

你怎么能解决这个问题?你不能,不能没有并行文件系统。但是您可能会发现让主进程读取整个文件然后将其打包出来比您当前的方法更快。您可能不会,您可能只是发现当前方法对您的整个计算来说是最快的。例如,如果读取时间占总计算时间的 10%,您可能会认为这是一个合理的开销。

【讨论】:

    【解决方案2】:

    要添加到高性能标记的正确答案,可以使用 MPI-IO 进行文件读取,向 IO 例程提供(在这种情况下)提示不要从每个处理器读取;但是,如果您移动到具有并行文件系统的集群,那么具有修改(或空)MPI_Info 的相同代码也应该能够利用并行文件系统。对于 MPI-IO 最常见的实现,Romio,描述可用提示的手册是 here;特别是,我们正在使用

    MPI_Info_set(info, "cb_config_list","*:1");
    

    将阅读器的数量设置为每个节点一个。下面的代码将让您尝试使用 MPI-IO 或 POSIX 读取文件(例如,seek)。

    #include <iostream>
    #include <fstream>
    #include <mpi.h>
    
    void partitionFile(const int filesize, const int rank, const int size,  
                       const int overlap, int *start, int *end) {
        int localsize = filesize/size;
        *start = rank * localsize;
        *end   = *start + localsize-1;
    
        if (rank != 0)      *start -= overlap;
        if (rank != size-1) *end   += overlap;
    }
    
    void readdataMPI(MPI_File *in, const int rank, const int size, const int overlap,
                   char **data, int *ndata) {
        MPI_Offset filesize;
        int start;
        int end;
    
        // figure out who reads what 
    
        MPI_File_get_size(*in, &filesize);
        partitionFile((int)filesize, rank, size, overlap, &start, &end);
    
        *ndata =  end - start + 1;
    
        // allocate memory
        *data = new char[*ndata + 1];
    
        // everyone reads in their part 
        MPI_File_read_at_all(*in, (MPI_Offset)start, *data, 
                             (MPI_Offset)(*ndata), MPI_CHAR, MPI_STATUS_IGNORE);
        (*data)[*ndata] = '\0';
    }
    
    void readdataSeek(std::ifstream &infile, int array[2], char *buffer)
    {
        int read_length = array[1] - array[0];
        if (infile)
        {
            infile.seekg(array[0]); // Start reading in supposed byte
            infile.read(buffer, read_length);
        }
    }
    
    int main(int argc, char **argv) {
    
        MPI_File in;
        int rank, size;
        int ierr;
    
        MPI_Init(&argc, &argv);
        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
        MPI_Comm_size(MPI_COMM_WORLD, &size);
    
        if (argc != 3) {
            if (rank == 0) 
                std::cerr << "Usage: " << argv[0] << " infilename [MPI|POSIX]" << std::endl;
            MPI_Finalize();
            return -1;
        }
    
        std::string optionMPI("MPI");
    
        if ( !optionMPI.compare(argv[2]) ) {
            MPI_Info info;
            MPI_Info_create(&info);
            MPI_Info_set(info, "cb_config_list","*:1"); // ROMIO: one reader per node
                                                        // Eventually, should be able to use io_nodes_list or similar
    
            ierr = MPI_File_open(MPI_COMM_WORLD, argv[1], MPI_MODE_RDONLY, info, &in);
            if (ierr) {
                if (rank == 0) 
                    std::cerr << "Usage: " << argv[0] << " Couldn't open file " << argv[1] << std::endl;
                MPI_Finalize();
                return -1;
            }
    
            const int overlap=1;
            char *data;
            int ndata;
            readdataMPI(&in, rank, size, overlap, &data, &ndata);
    
            std::cout << "MPI: Rank " << rank << " has " << ndata << " characters." << std::endl;
    
            delete [] data;
    
            MPI_File_close(&in);
            MPI_Info_free(&info);
        } else {
            int fsize;
            if (rank == 0) {
                std::ifstream file( argv[1], std::ios::ate );
                fsize=file.tellg();
                file.close();
            }
            MPI_Bcast(&fsize, 1, MPI_INT, 0, MPI_COMM_WORLD);
    
            int start, end;
            partitionFile(fsize, rank, size, 1, &start, &end);
    
            int array[2] = {start, end};
            char *buffer = new char[end-start+2];
    
            std::ifstream infile;
            infile.open(argv[1], std::ios::in);
            readdataSeek(infile, array, buffer);
            buffer[end-start+1] = '\0';
            std::cout << "Seeking: Rank " << rank << " has " << end-start+1 << " characters." << std::endl;
    
            infile.close() ;
            delete [] buffer;
        }
    
        MPI_Finalize();
        return 0;
    }
    

    在我的桌面上,我没有得到太大的性能差异,即使是超额订阅内核(例如,使用大量搜索):

    $ time mpirun -np 20 ./read-chunks moby-dick.txt POSIX
    Seeking: Rank 0 has 62864 characters.
    [...]
    Seeking: Rank 8 has 62865 characters.
    
    real    0m1.250s
    user    0m0.290s
    sys 0m0.190s
    
    $ time mpirun -np 20 ./read-chunks moby-dick.txt MPI
    MPI: Rank 1 has 62865 characters.
    [...]
    MPI: Rank 4 has 62865 characters.
    
    real    0m1.272s
    user    0m0.337s
    sys 0m0.265s
    

    【讨论】:

    猜你喜欢
    • 2018-01-04
    • 1970-01-01
    • 1970-01-01
    • 2013-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-04
    • 2014-01-15
    相关资源
    最近更新 更多