【问题标题】:Mahout spark-itemsimilarity saveAsTextFile final stage is very slowMahout spark-itemsimilarity saveAsTextFile 最后阶段很慢
【发布时间】:2016-02-06 16:09:47
【问题描述】:

我在 cli 的 HDP 2.2 集群上以 YARN 客户端模式在 Spark 1.5.1 上使用 Mahout 0.11.0。我的输入大约是 325Mb,分成 1000 个部分文件。这是我调用的确切命令:

$MAHOUT_HOME/bin/mahout spark-itemsimilarity --input unit-similarity-dump/bpc1 --output mahout-cooccurrence-output4 --maxPrefs 200 --maxSimilaritiesPerItem 100 --master yarn-client --sparkExecutorMem 10g -D:spark.yarn.executor.memoryOverhead=1024 -D:spark.executor.cores=5 -D:spark.executor.instances=50 -D:spark.yarn.am.memory=4g -D:spark.yarn.am.memoryOverhead=512 -D:spark.yarn.am.cores=2 -D:spark.driver.memory=20g -D:spark.yarn.driver.memoryOverhead=2048 -D:spark.driver.cores=4 -D:spark.driver.maxResultSize=10g -D:spark.yarn.queue=product -D:hdp.version=2.2.6.0-2800

应用程序运行良好,直到最后阶段,saveAsTextFile 被调用。在这一点上,任务逐渐停止,每项任务都需要 45 分钟到一个小时才能成功。仔细观察,似乎每个任务都在读取 MapPartitionsRDD 的所有 1000 个分区,我认为,直观地说,这一定是性能问题的根源。这些分区在所有 executor 中分​​布均匀,因此我认为每个任务都需要从不是其直接父级的 n-1 个 executor 请求所有分区。

优化此应用程序的最佳方法是什么?更少的分区,所以需要更少的远程数据?更少的执行者,因此每个任务的本地化数据百分比更高?尝试为 RDD 强制使用更高的复制因子?现在它似乎默认为 Storage Level: Memory Deserialized 1x Replicated,100% 缓存。

为了清楚起见,这里是舞台细节的截图:saveAsTextFile stage

提前感谢您提供任何见解。

更新:

我尝试仅使用 1 个多核执行器(即任务),尽管所有 RDD 分区都存在于单个本地执行器上,但性能仍然非常缓慢。我认为剩下的唯一罪魁祸首是reduceByKey 在最终的saveAsTextFile DAG 中造成的洗牌。

第二次更新:

我也尝试只使用 1 个输入分区,而我之前使用的是 100 甚至 1000 个。结果非常相似,总结为 here。为了清楚起见,在这次运行中,我使用了一个 5 核的 20g 执行器。然而,这种方法确实导致总资源分配减少了大约一个数量级(以 MB 秒和 vcore 秒为单位衡量)。这可能是由于先前运行中执行器和线程的过度分配造成的,这意味着瓶颈可能不受计算限制。

【问题讨论】:

  • 您的问题是否有进一步的更新?你修好了吗?我在使用 spark 1.6.3 和 Mahout 0.13 时遇到了同样的问题,但无法弄清楚。

标签: apache-spark mahout mahout-recommender collaborative-filtering


【解决方案1】:

不确定我是否遵循上述所有描述。有一个 BiMap 双向字典,可将列和行 id 从序数 Mahout id 转换为字符串外部 id。这些数据结构在内存中,每种类型的 id(行/列)相当于 2 个哈希图。 reduceByKey 对 Mahout id 起作用,因此翻译仅在输入和输出期间发生。这些数据结构被读入驱动程序,然后广播到每个节点,每个节点只制作一份副本,其中 BiMap(实际上是 BiDictionary)由执行程序共享。

默认情况下,分区设置为“自动”。 Mahout 11 中的哪个值应该是针对共现计算优化的值,这就是事情“嗡嗡作响”的原因。

最后一步之后reduceByKey获取剩余矩阵(行键,向量)中的每个值,将键和向量元素的每个id转换回字符串,并将文本写入文件并行。

坦率地说,我发现文本文件的读写很大程度上依赖于手动调整。我的主要经验是并行读取,其中 Spark 在分区之前读取所有文件统计信息——全部。与读取 1000 个文件并在阅读之前将它们连接到一个文件相比,这非常慢(自己尝试一下,他们可能已经解决了这个问题)。

在我看来,您需要更好的 saveAsTextFilesaveAsTextFile 的手动调整最好使用您自己的分布式操作来完成,foreach 在根据您自己的参数对 RDD 进行一些重新分区后工作。在此处查看文档:http://spark.apache.org/docs/latest/programming-guide.html#printing-elements-of-an-rdd

如果您想进行实验,请将 TextDelimitedIndexedDatasetReaderWriter 子类化以提供您自己的 writer Trait。 Mahout 也有一个可以使用的mapBlock 操作。它将一行行块传递给每个mapBlock,您可以使用 BiMap 编写这些行来转换 id。

希望听到 Mahout 用户列表中的任何结果。

【讨论】:

  • 嗨,帕特。首先,感谢您的回答,其次,我非常喜欢 Occam 的弯刀以及您在协同过滤领域的工作。感谢您的所有贡献!在消化了你的答案并进一步挖掘之后,我认为最终的 BiMap int 字符串可能需要这么多资源。我总共有 120 万个唯一 id,415k 行,每行最多有 100 个相似之处,我的应用程序特定 id 是大约 20 个字符长的字符串。虽然这看起来不太“大”,但我认为这是罪魁祸首,因为根据 UI,所有任务时间都花在了执行程序计算上。
  • Bimap 的运行时间是 O(log(n)) 复杂度,优于 O(n),我们通常衡量好坏的标准。这里 n 是非零矩阵元素的数量。 BiMap 只是两个 HashMap,并且只有一个用于写操作中的 Int->String 查找。对于反向查找,我们可能会使用数组,因为 Ints 是序号 Mahout ID,因此是优化的主题。我仍然怀疑这是一个分区问题,这取决于有多少文件被用作输入。要对此进行测试,请将您的输入文件连接到一个文件中并再次运行该作业。
  • 仅使用 1 个输入文件对应用程序的挂钟时间影响很小,但总体影响不大。最值得注意的是,第 18 阶段的任务时间仍在 45-60 分钟范围内,尽管要执行的任务时间较少(由于整体分区较少)。值得注意的一件事是第 17 阶段的时间大幅增加。有 100 个分区,这个阶段需要 46 秒才能完成。使用 1 个输入零件文件,产生 4 个 RDD 分区,因此有 4 个任务,第 17 阶段需要 20 分钟才能完成。我已经用相关的工作统计数据更新了这个问题,供您阅读。
  • 不确定你说的是什么问题,BiMap 查找还是 saveAsTextfile?
  • 我的意思是,根据实验证据,输入分区数似乎对最终saveAsTextFile 阶段任务的性能没有任何影响,尽管它显然会影响第 17 阶段的表现(这不是这篇文章的重点,但也许很有趣)。在我看来,同样有趣的是,当我只使用 1 个执行程序马力最小的分区时,整个应用程序的挂钟时间几乎相等,而许多分区的执行程序马力最大。这让我相信性能瓶颈既不是 CPU 也不是内存限制。
猜你喜欢
  • 2017-08-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-01-11
  • 2015-11-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多