【发布时间】:2013-10-21 21:46:17
【问题描述】:
我很难理解 mapreduce 中的数据流。最近,当我的磁盘在减少阶段内存不足时,一项要求非常高的工作崩溃了。我发现很难估计我的工作需要多少磁盘。我将详细描述数据流。
如果有人能纠正、详细说明 mapreduce 中的数据流或就我的系统尺寸提供建议,将会很有帮助。
集群配置:
我有一个包含 30 个从属服务器的集群,其中
- 12 GB 内存
- 100 GB 硬盘
- 4 核
我的地图任务与字数统计非常相似,因此它们需要的内存很少。我的reduce 任务处理单词的排列组。由于需要加入相同的单词,reduce 函数需要一个始终
由于我有 12GB 的 RAM,而我的 hadoop 守护进程需要 1GB 的堆 + 500MB 用于操作系统,我将 map/reduce 插槽划分如下:
4 个 900MB 堆的映射槽和2 个 3GB 堆的减少槽。由于映射槽不需要超过 300MB 的内存,我将 io.sort.mb 设置为 500 MB 以改进映射阶段的内存排序。
我的工作有 1800 个地图任务,每个任务都会生成 8 GB 的地图输出。由于我使用 BZIP2 进行压缩,因此可以压缩到 1 GB。这意味着当我有 3 TB 内存时,总地图输出将低于 2 TB。
我选择了 100 个 reduce 任务,每个任务产生 5 GB 的输出。
乍一看,一切都应该适合记忆。但显然排序阶段需要压缩和解压缩,而复制阶段需要数据同时在 2 个地方(我假设)。所以这就是它变得棘手的地方,这就是为什么我想完全理解数据流。这是我认为它的工作原理,但如果我错了,请纠正我:
数据流
地图任务会生成许多 溢出(在我的情况下为 200 个),它们在内存中排序,然后在写入之前压缩到本地磁盘。一旦地图任务完成,这会给我 200 个溢出文件,每 10 个 (io.sort.factor) 合并。
这意味着有 10 个文件被解压缩:10 x (5MB -> 40MB),因此这会产生 0.4GB 的压缩/解压缩开销。虽然我不确定 200 次泄漏第一次合并后会发生什么。我想他们在每个减少任务中首先被洗牌?所以文件的大小不会真正增加太多。
如果我们从黑盒的角度来看,这意味着我们从 200 个压缩溢出开始,最终得到 100 个用于 reduce 任务的压缩文件(每个任务 1 个)。
由于我只有 60 个 reducer,现在每个节点 60 个压缩文件被复制到 reducer,这已经在映射阶段完成。这可能意味着压缩文件暂时存在于源和目标中。这意味着在这种情况下,每个节点的内存需求(暂时)上升 160 个压缩文件,是映射输出的 1.6 倍。 地图输出为 1800 GB,因此我们最终得到 2880 GB,尽管是暂时的。所以第一个reduce阶段应该能够开始并且确实如此。复制后(我希望!)数据会从映射器本地输出目录中删除,因此我们的数据量与地图输出的数据量相同,再次为 1800 GB。
现在 reducer 中的排序阶段开始了。我希望它在映射器的内存被清除之前不要启动?!由于要合并1800个map任务的输出,所以必须解压。 reduce 任务的输入大约是 mapoutput / 100 = 18 GB 的压缩数据。现在这是如何解压的,它不能一次全部解压,因为那时我每个节点会有 144GB,而且由于我的工作没有崩溃,所以解压的执行会稍微智能一些。我会以与映射阶段相同的方式思考:同时解压缩和合并 10 个文件(1800 个任务输出)。然后,解压缩将在每个合并轮中产生 18GB/180 = 100 MB 的开销。问题再次是最后一轮合并是如何发生的,我记得在 hadoop 参考中读到,reducer 不会继续合并,直到只剩下一个文件。
在 reduce 阶段排序后,reduce 阶段运行,这需要对输入记录进行解压缩,但由于每个 reduce 任务都使用 500 个输入键组,这应该不是真正的问题。
如前所述,reduce 任务向 DFS 生成大约 5GB 的输出(总共 0.5TB)。
在前 60 个 reduce 任务完成后,这项工作真的会遇到麻烦。在第二轮中,任务在排序阶段开始崩溃,这让我认为这与复制开销或解压缩开销有关。
确切的例外是:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
我希望我足够详细地解释了我的程序流程以及我对 mapreduce 的理解。如果:
- 有人可以清除有关复制阶段和合并阶段的烟雾
- 以及提供克服工作崩溃的建议。
- 能够准确估计我需要多少内存对我来说是理想的,因为如果我尝试一个有 40 个节点的集群在运行 5 天后发生崩溃,那将是令人不快的(就像这次一样),因为截止日期越来越近了。
提前致谢
我的作业失败的堆栈跟踪在这里:
例外 1:
org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:510)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:142)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2539)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:661)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
例外 2:
FAILEDjava.io.IOException: Task: attempt_201310160819_0001_r_000075_1 - The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/map_1622.out
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.MapOutputFile.getInputFileForWrite(MapOutputFile.java:176)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2798)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2762)
异常3:(可能是diskchecker异常引起的)
Task attempt_201310160819_0001_r_000077_1 failed to report status for 2400 seconds. Killing!
【问题讨论】:
-
对映射器数据流的更正:它们首先根据reduce任务进行分区,然后排序!
标签: java memory hadoop mapreduce