【问题标题】:Parallel Reading/Writing File in cc语言中的并行读/写文件
【发布时间】:2012-02-13 22:37:52
【问题描述】:

问题是n个进程同时读取一个大小约为20GB的文件。文件在每一行包含一个字符串,并且字符串的长度可能相同也可能不同。字符串长度最多为 10 个字节。

我有一个包含 16 个节点的集群。每个节点都是单处理器,有 6GB RAM。我正在使用 MPI 编写并行代码。

对这个大文件进行分区以便利用所有资源的有效方法是什么?

注意:对分区的限制是将文件作为固定行数的块读取。 假设文件包含 1600 行(例如 1600 个字符串)。那么第一个进程应该从第 1 行读取到第 100 行,第二个进程应该从第 101 行读取到第 200 行,依此类推......

我认为一个文件一次不能被多个进程读取,因为我们只有一个文件处理程序指向某个地方只有一个字符串。那么其他进程如何从不同的块中并行读取呢?

【问题讨论】:

    标签: algorithm parallel-processing mpi


    【解决方案1】:

    因此,正如您所发现的,文本文件格式不适合处理大量数据;它们不仅比二进制格式大,而且您会遇到像这里这样的格式问题(搜索换行符),而且一切都非常慢(数据必须转换为字符串)。基于文本的格式和数字数据的二进制格式之间的 IO 速度很容易相差 10 倍。但我们现在假设你被文本文件格式卡住了。

    大概是为了提高速度而进行分区。但是除非你有一个并行文件系统——也就是说,多个服务器从多个磁盘提供服务,以及一个可以保持这些协调的 FS——否则你不太可能通过从同一个文件中读取多个 MPI 任务来获得显着的加速,因为最终这些请求都将在服务器/控制器/磁盘级别进行序列化。

    此外,读取大块数据将比 fseek() 到处寻找换行符进行小读取要快得多。

    所以我的建议是让一个进程(也许是最后一个)以尽可能少的块读取所有数据,并将相关行发送到每个任务(最后包括它本身)。如果您知道文件开头有多少行,这相当简单;读入 2 GB 数据,在内存中搜索 N/Pth 行的末尾,并将其发送到任务 0,向任务 0 发送“已完成数据”消息,然后继续。

    【讨论】:

    • 感谢您提供如此有价值的见解。我在集群中配置了 NFS,并且通过将文件指针偏移到 (myrank * N/Pth) 行来读取大块数据。当我使用 MPI_File_read(few parameters) 函数从文件中读取这么多数据块时,在这里我必须将第三个参数作为要读取的字节数传递但我的要求是读取 N/Pth 个块行不是字节数。那么我们如何摆脱困境呢?
    • 先生,请给我紧急的解决方案。
    • 你不能;这就是为什么文本文件对于大量数据来说是个坏主意。您只能通过实际搜索文件来知道行在哪里,这非常昂贵。这就是为什么我建议在一个处理器上阅读全部内容并分发文件的原因。如果您有一个具有固定记录长度的二进制文件,则 MPI-IO 方法将是一种不错的方法(尽管单个 NFS 服务器的性能可能仍然有限。)
    【解决方案2】:

    您没有指定分区是否有任何限制,所以我假设没有。我还假设您希望分区的大小尽可能接近。

    天真的方法是将文件拆分为大小为20GB/n 的块。对于i=0..n-1,块i 的起始位置将是i*20GB/n

    当然,问题在于不能保证块边界会落在输入文件的行之间。一般来说,他们不会。

    幸运的是,有一种简单的方法可以解决此问题。如上所述建立边界后,稍微移动它们,使它们中的每一个(i=0 除外)都放在以下换行符之后。

    这将涉及读取文件的 15 个小片段,但会产生非常均匀的分区。

    事实上,校正可以由每个节点单独完成,但可能不值得将解释复杂化。

    【讨论】:

    • 非常感谢先生!根据您富有成效的回答,我添加了我的限制条件。
    • 先生,您的均匀分布逻辑非常好。但是我们一次不能通过多个进程读取一个文件,因为我认为我们只有一个文件处理程序,它指向某个地方只有一个字符串。那么其他进程如何从不同的块中并行读取呢?
    【解决方案3】:

    我认为最好编写一段代码来获取行长并将行分配给进程。该分配函数不适用于字符串本身,而仅适用于它们的长度。

    找到一个算法来均匀分布固定大小的源不是问题。

    然后,分发函数将告诉其他进程他们必须获得哪些工作。进程 0(分配器)将读取一行。它已经知道,该行 num. 1 应该由进程 1 工作。 ... P.0 读取第 num 行。 N 并且知道必须使用哪个进程。

    哦!我们不需要从一开始就优化分布。只需分发器进程从输入中读取一个新行并将其提供给一个空闲进程。就是这样。

    因此,您甚至有两种解决方案:一种是高度优化的,一种是简单的。

    如果分发器进程不时重新优化未读的字符串,我们可以达到更高的优化。

    【讨论】:

    • 先生,我认为字符串的长度是可变的。那么找到这么大文件的字符串长度并将它们分布在整个集群中会非常复杂
    • 尊敬的先生,我认为分配函数不适用于字符串本身,而是它们的长度。这有什么问题?
    • 然后分发函数将告诉其他进程他们必须获得哪些工作。
    • 先生,我没有参与其中。你能多探索一下吗?
    • 先生,我已经添加到答案中了。
    【解决方案4】:

    这是 python 中的一个函数,它使用 mpi 和 pypar 扩展来读取大文件中的行数,使用 mpi 在多个主机之间分配职责。

    def getFileLineCount( file1 ):
        import pypar, mmap, os
        """
        uses pypar and mpi to speed up counting lines
        parameters:
            file1 - the file name to count lines
        returns:
            (line count)
        """
    
        p1 = open( file1, "r" )
        f1 = mmap.mmap( p1.fileno(), 0, None, mmap.ACCESS_READ )
    
        #work out file size
        fSize = os.stat( file1 ).st_size
        #divide up to farm out line counting
        chunk = ( fSize / pypar.size() ) + 1
    
        lines = 0
        #set start and end locations
        seekStart = chunk * ( pypar.rank() )
        seekEnd = chunk * ( pypar.rank() + 1 )
        if seekEnd > fSize:
            seekEnd = fSize
    
        #find start of next line after chunk
        if pypar.rank() > 0:
            f1.seek( seekStart )
            l1 = f1.readline()
            seekStart = f1.tell()
    
        #tell previous rank my seek start to make their seek end
        if pypar.rank() > 0:
    #        logging.info( 'Sending to %d, seek start %d' % ( pypar.rank() - 1, seekStart ) )
            pypar.send( seekStart, pypar.rank() - 1 )
        if pypar.rank() < pypar.size() - 1:
            seekEnd = pypar.receive( pypar.rank() + 1 )
    #        logging.info( 'Receiving from %d, seek end %d' % ( pypar.rank() + 1, seekEnd ) )
    
        f1.seek( seekStart )
    
        logging.info( 'Calculating line lengths and positions from file byte %d to %d' % ( seekStart, seekEnd ) )
    
        l1 = f1.readline()
        prevLine = l1
    
        while len( l1 ) > 0:
            lines += 1
    
            l1 = f1.readline()
            if f1.tell() > seekEnd or len( l1 ) == 0:
                break
    
            prevLine = l1
        #while
        f1.close()
        p1.close()
    
        if pypar.rank() == 0:
            logging.info( 'Receiving line info' )
            for p in range( 1, pypar.size() ):
                lines += pypar.receive( p )
        else:
            logging.info( 'Sending my line info' )
            pypar.send( lines, 0 )
    
        lines = pypar.broadcast( lines )
        return ( lines )
    

    【讨论】:

    • 因为它就在那里。我有一个应用程序需要记录每行的文件位置以及它们的宽度,我采用了该代码并删除了此示例中有趣的部分。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-21
    • 2022-12-31
    • 2019-11-01
    • 2013-11-13
    • 2011-03-01
    • 2017-03-18
    • 1970-01-01
    相关资源
    最近更新 更多