【问题标题】:Processing a .txt file of 10 GB处理 10 GB 的 .txt 文件
【发布时间】:2016-09-28 13:26:38
【问题描述】:

我有一个大约 10 GB 的文本文件,我需要对文件中的文本数据进行一些处理。 读取、访问和处理如此庞大的文件的最佳方式是什么?

我正在考虑将文件分成块,然后通过处理较小的文件(或者可以在缓冲区中 - 更好)来处理它,然后合并结果。更像是 map-reduce 范式,但不会使用大数据技术。

【问题讨论】:

  • 你总是可以使用Unix来处理这么大的文件。
  • 我将进行一些复杂的处理,这将在应用程序级别完成
  • 如果您在算法中使用流式方法,您将不受任何大小的限制。此外,无需将文件分成块,然后通过处理较小的文件来处理它。打开文件,读取内存中的一个块,处理块,读取另一个块,然后合并结果
  • @UmNyobe 同意缓冲的事情

标签: file data-processing bigdata


【解决方案1】:

如何处理数据的选项:

  • 将所有内容加载到 RAM 中并立即处理 - 如果适合...

  • 一次只处理一行;例如逐行。如果所有处理只需要处理线本身以外的其他信息,那就太好了 - 没有共享存储,没有数据库......

  • 结合以上两个:读取一堆项目(文本行等),处理它们,读取另一堆项目......如果您想要/需要在处理时使用共享存储(数据库)然后批量处理比一个一个处理更有效。

  • “Hadoop 风格”:使用可扩展性良好的算法和数据结构,例如映射、排序、窗口化、事件流、二分搜索 - 并将它们连接到数据处理管道中。没有共享存储。基本上这是“逐行”方法,但有一些魔法可以为您提供“正确的行”(排序、聚合、按某个键分组、前 N 个、最后 N 个......)。

    李>

我的一些经验提示:

  • 使用压缩。即使您的磁盘足够大,磁盘(或网络!)I/O 通常也是瓶颈。

  • 尽可能使用批处理/块来一次处理/发送/保存/加载...更多项目。如果处理数据库:一次处理(选择、插入、更新...)更多项目。例如 MongoDB 有批量操作。这节省了网络 I/O 开销。

  • 尽量减少系统调用计数(通过像上面提到的那样批量处理)。每个系统调用意味着 CPU 必须切换上下文,CPU 缓存内容消失,操作系统可能必须与硬件通信......

  • 使用所有 CPU 内核。某些平台(Python、Ruby)在这里的进程比线程更好。

  • 尽可能使用 CPU 缓存。例如,像数组或 C++ vector 这样的“线性”数据结构在这方面比链接列表更好。使用排序数组和二分查找代替 dict/map 和键查找 - 更小的内存占用、更小的内存碎片、更大的 CPU 缓存命中率。

  • 将输入数据拆分为多个部分,这样即使加载数据也可以轻松实现并行化。

现在,怎么做:

您可以在“本地主机模式”下使用 Hadoop 或类似工具 - 无需使用 YARN、Zookeeper 等部署全栈。只需安装 hadoop(或类似的东西),用您的数据处理逻辑编写一些 .java 文件,编译为 .jar,在 Hadoop 中执行,完成。无需使用 HDFS(如果您不想),只需使用普通文件即可。

或者从头开始写一些东西。这里我推荐 Python,因为它适用于所有可以想象的东西(文件格式、数据库、数学库),它的 multiprocessing 模块提供了很好的工具(如进程、进程池、队列、锁、并行映射、类似 redis 的数据服务器) 使您的程序有点分散。如果您发现 Python ,只需将慢速部分重写为 C/C++ 并在 Python 中使用它(使用 cffi 或 Cython)。

大多数 Python 多处理功能仅限于单个主机/计算机。我认为这基本没问题,因为今天的硬件通常有很多 CPU 内核。如果没有,只需以每小时几美分的价格启动一些具有任意数量内核的 AWS EC2 实例。

让我们举个例子 - 字数统计,“大数据你好世界”。使用 Python。我将使用 618 MB 压缩和 2.35 GB 未压缩的 cswiki.xml.bz2 维基百科转储。这是一个 XML 文件,但我们会将其作为文本文件使用以保持简单:)

首先 - 处理单个文件很乏味。最好将其拆分为更小的文件,这样 输入数据可以更容易地分发给多个工作人员:

$ bzcat cswiki-20160920-pages-articles-multistream.xml.bz2 | \
    split \
        --filter='xz -1 > $FILE' \
        --additional-suffix=.xz \
        --lines=5000000 \
        - cswiki-splitted.

结果:

$ ls -1hs cswiki*
618M cswiki-20160920-pages-articles-multistream.xml.bz2
 94M cswiki-splitted.aa.xz
 77M cswiki-splitted.ab.xz
 74M cswiki-splitted.ac.xz
 64M cswiki-splitted.ad.xz
 62M cswiki-splitted.ae.xz
 56M cswiki-splitted.af.xz
 54M cswiki-splitted.ag.xz
 58M cswiki-splitted.ah.xz
 59M cswiki-splitted.ai.xz
 15M cswiki-splitted.aj.xz

这是一个使用 multiprocessing.Pool 的简单字数统计实现:

#!/usr/bin/env python3

import lzma
import multiprocessing
from os import getpid
from pathlib import Path
import re

def main():
    input_dir = Path('.')
    input_files = [p for p in input_dir.iterdir() if p.name.startswith('cswiki-splitted.')]

    pool = multiprocessing.Pool()
    partial_results = pool.map(process_file, input_files)

    aggregated_results = {}
    for pr in partial_results:
        for word, count in pr.items():
            aggregated_results[word] = aggregated_results.get(word, 0) + count

    words_and_counts = aggregated_results.items()
    counts_and_words = [(c, w) for w, c in words_and_counts]
    counts_and_words.sort(reverse=True)
    print('Top 100:', counts_and_words[:100])

def process_file(path):
    print('Process {} reading file {}'.format(getpid(), path))
    f = lzma.open(str(path), 'rt')
    counts = {}
    for line in f:
        words = re.split(r'\W+', line)
        for word in words:
            if word != '':
                word = word.lower()
                counts[word] = counts.get(word, 0) + 1
    return counts

if __name__ == '__main__':
    main()

输出:

$ ./wordcount.py
Process 2480 reading file cswiki-splitted.ab.xz
Process 2481 reading file cswiki-splitted.ah.xz
Process 2482 reading file cswiki-splitted.aj.xz
Process 2483 reading file cswiki-splitted.aa.xz
Process 2484 reading file cswiki-splitted.af.xz
Process 2485 reading file cswiki-splitted.ac.xz
Process 2486 reading file cswiki-splitted.ai.xz
Process 2487 reading file cswiki-splitted.ae.xz
Process 2482 reading file cswiki-splitted.ad.xz
Process 2481 reading file cswiki-splitted.ag.xz
Top 100: [(4890109, 'quot'), (4774018, 'gt'), (4765677, 'lt'), (4468312, 'id'), (4433742, 'v'), (4377363, 'a'), (2767007, 'na'), (2459957, 'text'), (2278791, 'amp'), (2114275, 'se'), (1971423, 'ref'), (1968093, 'kategorie'), (1799812, 'align'), (1795733, 'nbsp'), (1779981, 'title'), (1662895, '0'), (1592622, '1'), (1489233, 'page'), (1485505, 'je'), (1483416, 'model'), (1476711, 'format'), (1473507, '2'), (1470963, 'ns'), (1468018, 'revision'), (1467530, 'contributor'), (1467479, 'timestamp'), (1467453, 'sha1'), (1429859, 'comment'), (1414549, 'username'), (1261194, 's'), (1177526, '3'), (1159601, 'z'), (1115378, 'http'), (1040230, 'parentid'), (1012821, 'flagicon'), (949947, 'do'), (920863, 'right'), (887196, 'br'), (828466, 'x'), (797722, 've'), (795342, '4'), (783019, 'www'), (778643, '6'), (762929, 'name'), (762220, 'wiki'), (757659, 'i'), (752524, 'space'), (742525, 'xml'), (740244, 'center'), (733809, 'preserve'), (733752, 'wikitext'), (730781, 'o'), (725646, 'cz'), (679842, '5'), (672394, 'datum'), (599607, 'u'), (580936, 'byl'), (563301, 'k'), (550669, 'roce'), (546944, '10'), (536135, 'pro'), (531257, 'jako'), (527321, 'rd1'), (519607, '7'), (515398, 'roku'), (512456, 'od'), (509483, 'style'), (488923, 'za'), (485546, 'titul'), (467147, 'jméno'), (451536, '14'), (448649, '2016'), (447374, 'po'), (444325, 'citace'), (442389, 'jpg'), (424982, '12'), (423842, 'že'), (416419, 'název'), (408796, 'redirect'), (405058, 'minor'), (402733, 'to'), (400355, 'soubor'), (398188, '8'), (395652, 'the'), (393122, '11'), (389370, 'místo'), (368283, '15'), (359019, 'url'), (355302, 'monografie'), (354336, 'odkazy'), (352414, 'jsou'), (348138, 'of'), (344892, 'narození'), (340021, 'vydavatel'), (339462, '2014'), (339219, '20'), (339063, 'jeho'), (336257, '9'), (332598, 'praha'), (328268, 'byla')]

我们可以看到来自 XML 标记和属性的大量噪音。这就是在 XML 文件上运行 wordcount 得到的结果:)

所有文件读取和字数统计都是并行完成的。在主进程中只进行了最后的聚合。

【讨论】:

  • 非常感谢您的详细回答。欣赏它。
【解决方案2】:

如果将所有 10 GB 加载到内存中,一切都很简单。

如果您负担不起,那么您一次只能将大文件的一部分加载到缓冲区中。

当您处理完一部分后,您滑动窗口(更改范围),将新的数据范围加载到缓冲区中,因此缓冲区中先前的数据范围将被丢弃(覆盖)。

您可能会寻找所需的位置,并且可能需要来回来回加载数据。它可能相对较慢,但这是您为使用较少内存(时空权衡)所付出的代价。

--

您可能想阅读可以处理大文件的程序的源代码。例如。文件归档器。

【讨论】:

    【解决方案3】:

    我会使用线程将文件的每个块加载到缓冲区中,然后处理缓冲区并执行您需要的操作。然后加载更多缓冲区从内存中删除以前的缓冲区并继续。查看音频是如何加载的,就像您想要的那样加载到缓冲区中。

    【讨论】:

    • 如果你正确地流式传输所有内容,你可以线程,它更有效但更困难。
    猜你喜欢
    • 1970-01-01
    • 2021-10-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多