【问题标题】:Detailed dataflow in hadoop's mapreduce?hadoop的mapreduce中的详细数据流?
【发布时间】:2013-10-21 21:46:17
【问题描述】:

我很难理解 mapreduce 中的数据流。最近,当我的磁盘在减少阶段内存不足时,一项要求非常高的工作崩溃了。我发现很难估计我的工作需要多少磁盘。我将详细描述数据流。

如果有人能纠正、详细说明 mapreduce 中的数据流或就我的系统尺寸提供建议,将会很有帮助。

集群配置:

我有一个包含 30 个从属服务器的集群,其中

  • 12 GB 内存
  • 100 GB 硬盘
  • 4 核

我的地图任务与字数统计非常相似,因此它们需要的内存很少。我的reduce 任务处理单词的排列组。由于需要加入相同的单词,reduce 函数需要一个始终

由于我有 12GB 的 RAM,而我的 hadoop 守护进程需要 1GB 的堆 + 500MB 用于操作系统,我将 map/reduce 插槽划分如下:

4 个 900MB 堆的映射槽2 个 3GB 堆的减少槽。由于映射槽不需要超过 300MB 的内存,我将 io.sort.mb 设置为 500 MB 以改进映射阶段的内存排序。

我的工作有 1800 个地图任务,每个任务都会生成 8 GB 的地图输出。由于我使用 BZIP2 进行压缩,因此可以压缩到 1 GB。这意味着当我有 3 TB 内存时,总地图输出将低于 2 TB

我选择了 100 个 reduce 任务,每个任务产生 5 GB 的输出。

乍一看,一切都应该适合记忆。但显然排序阶段需要压缩和解压缩,而复制阶段需要数据同时在 2 个地方(我假设)。所以这就是它变得棘手的地方,这就是为什么我想完全理解数据流。这是我认为它的工作原理,但如果我错了,请纠正我:

数据流

地图任务会生成许多 溢出(在我的情况下为 200 个),它们在内存中排序,然后在写入之前压缩到本地磁盘。一旦地图任务完成,这会给我 200 个溢出文件,每 10 个 (io.sort.factor) 合并。 这意味着有 10 个文件被解压缩:10 x (5MB -> 40MB),因此这会产生 0.4GB 的压缩/解压缩开销。虽然我不确定 200 次泄漏第一次合并后会发生什么。我想他们在每个减少任务中首先被洗牌?所以文件的大小不会真正增加太多。 如果我们从黑盒的角度来看,这意味着我们从 200 个压缩溢出开始,最终得到 100 个用于 reduce 任务的压缩文件(每个任务 1 个)。

由于我只有 60 个 reducer,现在每个节点 60 个压缩文件被复制到 reducer,这已经在映射阶段完成。这可能意味着压缩文件暂时存在于源和目标中。这意味着在这种情况下,每个节点的内存需求(暂时)上升 160 个压缩文件,是映射输出的 1.6 倍。 地图输出为 1800 GB,因此我们最终得到 2880 GB,尽管是暂时的。所以第一个reduce阶段应该能够开始并且确实如此。复制后(我希望!)数据会从映射器本地输出目录中删除,因此我们的数据量与地图输出的数据量相同,再次为 1800 GB。

现在 reducer 中的排序阶段开始了。我希望它在映射器的内存被清除之前不要启动?!由于要合并1800个map任务的输出,所以必须解压。 reduce 任务的输入大约是 mapoutput / 100 = 18 GB 的压缩数据。现在这是如何解压的,它不能一次全部解压,因为那时我每个节点会有 144GB,而且由于我的工作没有崩溃,所以解压的执行会稍微智能一些。我会以与映射阶段相同的方式思考:同时解压缩和合并 10 个文件(1800 个任务输出)。然后,解压缩将在每个合并轮中产生 18GB/180 = 100 MB 的开销。问题再次是最后一轮合并是如何发生的,我记得在 hadoop 参考中读到,reducer 不会继续合并,直到只剩下一个文件。

在 reduce 阶段排序后,reduce 阶段运行,这需要对输入记录进行解压缩,但由于每个 reduce 任务都使用 500 个输入键组,这应该不是真正的问题。

如前所述,reduce 任务向 DFS 生成大约 5GB 的输出(总共 0.5TB)。

在前 60 个 reduce 任务完成后,这项工作真的会遇到麻烦。在第二轮中,任务在排序阶段开始崩溃,这让我认为这与复制开销或解压缩开销有关。

确切的例外是:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3

我希望我足够详细地解释了我的程序流程以及我对 mapreduce 的理解。如果:

  • 有人可以清除有关复制阶段和合并阶段的烟雾
  • 以及提供克服工作崩溃的建议。
  • 能够准确估计我需要多少内存对我来说是理想的,因为如果我尝试一个有 40 个节点的集群在运行 5 天后发生崩溃,那将是令人不快的(就像这次一样),因为截止日期越来越近了。

提前致谢

我的作业失败的堆栈跟踪在这里:

例外 1:

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:510)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:142)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2539)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:661)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

例外 2:

FAILEDjava.io.IOException: Task: attempt_201310160819_0001_r_000075_1 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/map_1622.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.MapOutputFile.getInputFileForWrite(MapOutputFile.java:176)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2798)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2762)

异常3:(可能是diskchecker异常引起的)

Task attempt_201310160819_0001_r_000077_1 failed to report status for 2400 seconds. Killing!

【问题讨论】:

  • 对映射器数据流的更正:它们首先根据reduce任务进行分区,然后排序!

标签: java memory hadoop mapreduce


【解决方案1】:

我刚刚收到来自 Praveen Sripati 的电子邮件,其中提到了 hadoop 参考,我将其粘贴在这里:

在复制阶段,数据是否同时存在于 map 和 reduce 任务中?地图输出什么时候清零?

以下内容来自 Hadoop - The Definitive Guide

主机不会在第一个 reducer 后立即从磁盘中删除 map 输出 已检索它们,因为减速器随后可能会失败。反而, 他们等到他们被jobtracker告知删除它们(或 应用程序主),这是在作业完成之后。

这很重要,地图输出保留在磁盘上!!对我来说有点不幸。

5) 然后reducers中的合并开始。不完全确定它是如何完成的。每个reduce键是否合并到一个文件?还是将所有内容合并为一个任务?

又出自同一本书

当所有的 map 输出都被复制后,reduce 任务进入 排序阶段(应该正确地称为合并阶段,如 排序是在地图端进行的),它合并了地图 输出,保持它们的排序顺序。这是轮流进行的。为了 例如,如果有 50 个地图输出并且合并因子为 10( 默认值,由 io.sort.factor 属性控制,就像在 地图的合并),将有五轮。每轮将合并 10 文件合二为一,所以最后会有五个中间文件。

谢谢,普拉文

这意味着合并后的文件数限制为 io.sort.factor。就我而言,有 10 个段,每个段 1.8GB。在最后一次合并期间,所有内容都必须解压缩,因此每轮需要 1.8 * 10 GB = 18 GB。

【讨论】:

  • 对我来说,一个解决方案可能是将mapred.local.dir 更改为外部存储(我们在集群上安装了共享)。缺点是在排序阶段需要网络来传输数据,但从好的方面来说,DFS 将有足够的空间来完成其余的工作。
猜你喜欢
  • 2010-12-30
  • 2014-10-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多