【发布时间】:2016-08-20 19:53:58
【问题描述】:
更新到注意更改聚合键
说明
我需要在匿名环境中从文件列中删除出现次数少于指定次数的所有值。将指定多个列,这些列要求值在该列中出现 X 次以上,并且空白相对于一列。这意味着如果 X = 4 并且值 123 在第 1 列中出现 3 次,在第 4 列中出现 4 次,则所有 3 次出现在第 1 列中必须为空白,但在第 4 列中是允许的。
我使用 Hadoop Streaming 和 Perl 的 2 pass 方法解决了这个问题,该方法在过去半年里一直很稳定。问题是提供了一个新文件,虽然我的进程会处理它,但在我们的 862 容器(54 节点)hadoop 集群上处理大约需要 5 天。我将解释我的解决方案/方法,并要求任何适合此问题并允许优化运行时间的优化或替代方法。
正在处理的文件统计信息:
- 大小 1.4TB
- 列 19,427
- 记录 2210 万
- 元素 429,336,700,000
2 通过方法(当前)
通过 1
此 Pass 使用带有 Perl (v5.10.1) Mapper 和 Perl reducer 的 Hadoop 流来获取输入文件的每一列(相对于每一列的计数)中的每个值的计数,如果计数为大于包含列、值、计数的指定 X。
映射器:使用 Perl 散列来构建包含列、值、计数的散列,其中计数随着该列的值的每次出现而递增:
$ColValCntHash{ $col }{ $value }++;
由于这是使用 hadoop 流的映射器,我以以下键值格式打印结果,以允许对 reducer 进行更分布式的分配:
col|value\tcount
这会生成列 # 和字段值的聚合键,然后将该列中该值的计数分配为键。
Reducer:读取传递给它的 col|value\tcount,解析键,构建一个散列来对从多个映射器传入的值的计数求和。
$FinalCountHash{ $col }{ $value } += $count;
最后,如果计数小于指定的X,每个reducer将col、value和count打印到输出文件中。
第一次通过结果:第 1 次生成一个“查找”列表,该列表来源于第 2 次出现消隐的地方,其结构如下:
Col\tValue\tCount
1\t123\t3
3\t234\t2
通过 2
如果输入文件中的值存在于通过 1 生成的查找文件中,则此通道使用仅带有 Perl 映射器(无缩减阶段)的 Hadoop 流式处理。
映射器: 映射器要求查找文件与分发到每个节点的 Perl 代码一起发送,以便可以使用它来构建查找哈希,例如:
$aggKey = $col . "|" . $value;
$LookupHash{ $aggKey }=1;
aggKey 用于减少散列哈希的内存开销,现在只是一个散列,它使我可以缓冲到内存中的值数量达到大约 2200 万。
当输入记录通过映射器时,列被迭代并检查哈希以查看列和值是否存在。如果是这样,则将列中的值替换为空白。
通过 2 输出: Pass 2 生成了一个输出文件(部分文件),其中包含输入文件的“匿名”版本。
问题
尽管集群有足够的内存(总计 2.4TB),但它在容器之间分配,并且在容器因内存问题而失败之前,最多通过 2 可以从查找文件中加载大约 22 百万个值。这需要通过将查找文件拆分为 22 百万个块并在第 2 遍的多遍中针对主文件运行它们来在输入上发生迭代消隐。10-100 GB 文件的整个过程在 5 到 20 分钟内运行,具体取决于要空白的值的数量。
现在我有一个 1.4TB 大小的文件来处理这个文件,其中包含近 20k 列,生成的查找文件包含 46 亿条记录(大约占文件总值的 1%,相对于以前的文件百分比) .
由于一次只能加载 22mil 查找值的内存限制......这使得我当前的进程需要在 pass 2 210 次中传递输入文件......并且它必须读取 1.4 TB 文件(其中每次移除都会略微收缩),每次移除大约需要 25 分钟。
我希望这充分描述了问题/当前解决方案/问题。任何帮助将不胜感激。
谢谢!
【问题讨论】:
标签: perl hadoop hadoop2 hadoop-streaming