如何处理数据的选项:
将所有内容加载到 RAM 中并立即处理 - 如果适合...
一次只处理一行;例如逐行。如果所有处理只需要处理线本身以外的其他信息,那就太好了 - 没有共享存储,没有数据库......
结合以上两个:读取一堆项目(文本行等),处理它们,读取另一堆项目......如果您想要/需要在处理时使用共享存储(数据库)然后批量处理比一个一个处理更有效。
-
“Hadoop 风格”:使用可扩展性良好的算法和数据结构,例如映射、排序、窗口化、事件流、二分搜索 - 并将它们连接到数据处理管道中。没有共享存储。基本上这是“逐行”方法,但有一些魔法可以为您提供“正确的行”(排序、聚合、按某个键分组、前 N 个、最后 N 个......)。
李>
我的一些经验提示:
使用压缩。即使您的磁盘足够大,磁盘(或网络!)I/O 通常也是瓶颈。
尽可能使用批处理/块来一次处理/发送/保存/加载...更多项目。如果处理数据库:一次处理(选择、插入、更新...)更多项目。例如 MongoDB 有批量操作。这节省了网络 I/O 开销。
尽量减少系统调用计数(通过像上面提到的那样批量处理)。每个系统调用意味着 CPU 必须切换上下文,CPU 缓存内容消失,操作系统可能必须与硬件通信......
使用所有 CPU 内核。某些平台(Python、Ruby)在这里的进程比线程更好。
尽可能使用 CPU 缓存。例如,像数组或 C++ vector 这样的“线性”数据结构在这方面比链接列表更好。使用排序数组和二分查找代替 dict/map 和键查找 - 更小的内存占用、更小的内存碎片、更大的 CPU 缓存命中率。
将输入数据拆分为多个部分,这样即使加载数据也可以轻松实现并行化。
现在,怎么做:
您可以在“本地主机模式”下使用 Hadoop 或类似工具 - 无需使用 YARN、Zookeeper 等部署全栈。只需安装 hadoop(或类似的东西),用您的数据处理逻辑编写一些 .java 文件,编译为 .jar,在 Hadoop 中执行,完成。无需使用 HDFS(如果您不想),只需使用普通文件即可。
或者从头开始写一些东西。这里我推荐 Python,因为它适用于所有可以想象的东西(文件格式、数据库、数学库),它的 multiprocessing 模块提供了很好的工具(如进程、进程池、队列、锁、并行映射、类似 redis 的数据服务器) 使您的程序有点分散。如果您发现 Python 慢,只需将慢速部分重写为 C/C++ 并在 Python 中使用它(使用 cffi 或 Cython)。
大多数 Python 多处理功能仅限于单个主机/计算机。我认为这基本没问题,因为今天的硬件通常有很多 CPU 内核。如果没有,只需以每小时几美分的价格启动一些具有任意数量内核的 AWS EC2 实例。
让我们举个例子 - 字数统计,“大数据你好世界”。使用 Python。我将使用 618 MB 压缩和 2.35 GB 未压缩的 cswiki.xml.bz2 维基百科转储。这是一个 XML 文件,但我们会将其作为文本文件使用以保持简单:)
首先 - 处理单个文件很乏味。最好将其拆分为更小的文件,这样
输入数据可以更容易地分发给多个工作人员:
$ bzcat cswiki-20160920-pages-articles-multistream.xml.bz2 | \
split \
--filter='xz -1 > $FILE' \
--additional-suffix=.xz \
--lines=5000000 \
- cswiki-splitted.
结果:
$ ls -1hs cswiki*
618M cswiki-20160920-pages-articles-multistream.xml.bz2
94M cswiki-splitted.aa.xz
77M cswiki-splitted.ab.xz
74M cswiki-splitted.ac.xz
64M cswiki-splitted.ad.xz
62M cswiki-splitted.ae.xz
56M cswiki-splitted.af.xz
54M cswiki-splitted.ag.xz
58M cswiki-splitted.ah.xz
59M cswiki-splitted.ai.xz
15M cswiki-splitted.aj.xz
这是一个使用 multiprocessing.Pool 的简单字数统计实现:
#!/usr/bin/env python3
import lzma
import multiprocessing
from os import getpid
from pathlib import Path
import re
def main():
input_dir = Path('.')
input_files = [p for p in input_dir.iterdir() if p.name.startswith('cswiki-splitted.')]
pool = multiprocessing.Pool()
partial_results = pool.map(process_file, input_files)
aggregated_results = {}
for pr in partial_results:
for word, count in pr.items():
aggregated_results[word] = aggregated_results.get(word, 0) + count
words_and_counts = aggregated_results.items()
counts_and_words = [(c, w) for w, c in words_and_counts]
counts_and_words.sort(reverse=True)
print('Top 100:', counts_and_words[:100])
def process_file(path):
print('Process {} reading file {}'.format(getpid(), path))
f = lzma.open(str(path), 'rt')
counts = {}
for line in f:
words = re.split(r'\W+', line)
for word in words:
if word != '':
word = word.lower()
counts[word] = counts.get(word, 0) + 1
return counts
if __name__ == '__main__':
main()
输出:
$ ./wordcount.py
Process 2480 reading file cswiki-splitted.ab.xz
Process 2481 reading file cswiki-splitted.ah.xz
Process 2482 reading file cswiki-splitted.aj.xz
Process 2483 reading file cswiki-splitted.aa.xz
Process 2484 reading file cswiki-splitted.af.xz
Process 2485 reading file cswiki-splitted.ac.xz
Process 2486 reading file cswiki-splitted.ai.xz
Process 2487 reading file cswiki-splitted.ae.xz
Process 2482 reading file cswiki-splitted.ad.xz
Process 2481 reading file cswiki-splitted.ag.xz
Top 100: [(4890109, 'quot'), (4774018, 'gt'), (4765677, 'lt'), (4468312, 'id'), (4433742, 'v'), (4377363, 'a'), (2767007, 'na'), (2459957, 'text'), (2278791, 'amp'), (2114275, 'se'), (1971423, 'ref'), (1968093, 'kategorie'), (1799812, 'align'), (1795733, 'nbsp'), (1779981, 'title'), (1662895, '0'), (1592622, '1'), (1489233, 'page'), (1485505, 'je'), (1483416, 'model'), (1476711, 'format'), (1473507, '2'), (1470963, 'ns'), (1468018, 'revision'), (1467530, 'contributor'), (1467479, 'timestamp'), (1467453, 'sha1'), (1429859, 'comment'), (1414549, 'username'), (1261194, 's'), (1177526, '3'), (1159601, 'z'), (1115378, 'http'), (1040230, 'parentid'), (1012821, 'flagicon'), (949947, 'do'), (920863, 'right'), (887196, 'br'), (828466, 'x'), (797722, 've'), (795342, '4'), (783019, 'www'), (778643, '6'), (762929, 'name'), (762220, 'wiki'), (757659, 'i'), (752524, 'space'), (742525, 'xml'), (740244, 'center'), (733809, 'preserve'), (733752, 'wikitext'), (730781, 'o'), (725646, 'cz'), (679842, '5'), (672394, 'datum'), (599607, 'u'), (580936, 'byl'), (563301, 'k'), (550669, 'roce'), (546944, '10'), (536135, 'pro'), (531257, 'jako'), (527321, 'rd1'), (519607, '7'), (515398, 'roku'), (512456, 'od'), (509483, 'style'), (488923, 'za'), (485546, 'titul'), (467147, 'jméno'), (451536, '14'), (448649, '2016'), (447374, 'po'), (444325, 'citace'), (442389, 'jpg'), (424982, '12'), (423842, 'že'), (416419, 'název'), (408796, 'redirect'), (405058, 'minor'), (402733, 'to'), (400355, 'soubor'), (398188, '8'), (395652, 'the'), (393122, '11'), (389370, 'místo'), (368283, '15'), (359019, 'url'), (355302, 'monografie'), (354336, 'odkazy'), (352414, 'jsou'), (348138, 'of'), (344892, 'narození'), (340021, 'vydavatel'), (339462, '2014'), (339219, '20'), (339063, 'jeho'), (336257, '9'), (332598, 'praha'), (328268, 'byla')]
我们可以看到来自 XML 标记和属性的大量噪音。这就是在 XML 文件上运行 wordcount 得到的结果:)
所有文件读取和字数统计都是并行完成的。在主进程中只进行了最后的聚合。