【发布时间】:2018-08-06 10:12:54
【问题描述】:
在 Hortonworks 集群上通过 Jupyter notebook 使用 Pyspark 1.6.2 处理以下步骤时,我们遇到了一个奇怪的情况:
- 从 pyspark 数据帧中的 ORC 表中读取数据
- 通过
pivot_column(pivoted_df) 透视此表 - 在
pivoted_df的特定选择上添加一些计算列:calced_df = pivoted_df.select(dependency_list).run_calculations() - 在大表
pivoted_df(列 > 1.600)和“小”表calced_df(仅 ~270 列)上进行内连接以合并所有列 - 保存到 Parquet 表
(在第 3 步中选择是必要的,因为否则使用 withColumn 语句添加一些计算字段将需要很长时间。在有很多列的表上,选择 + 连接比 withColumn 更快)
但是,对于 pivot_column 的变化少于 2.500 个的数据集,这项工作运行良好。例如,我们成功地处理了从 75.430.000 行和 1.600 列开始的作业。当我们使用另一个数据集处理作业时,包含更少的行 (50.000) 和更多的列 (2.433),它也可以工作。
但最终作业在最后一步崩溃,因为pivot_column 有超过 2500 个变体(并且只有大约 70.000 行),并带有 Stackoverflow 错误。我们使用一些show() 操作调试了单个步骤,以检查作业失败的确切位置。我们发现,直到第 4 步中的加入,一切正常。所以加入导致了问题,从这一步开始,我们收到以下消息:
Py4JJavaError: An error occurred while calling o4445.showString.
: java.lang.StackOverflowError
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:22)
at scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
at scala.collection.AbstractTraversable.to(Traversable.scala:105)
at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:267)
at scala.collection.AbstractTraversable.toSet(Traversable.scala:105)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:280)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
但是究竟是什么导致了这个错误,我们该如何避免呢?
我们当前的 Spark 配置:
.setMaster("yarn-client") \
.set("spark.executor.instances","10") \
.set("spark.executor.cores","4") \
.set("spark.executor.memory","10g") \
.set("spark.driver.memory","8g") \
.set("spark.yarn.executor.memoryOverhead","1200") \
.set("spark.sql.pivotMaxValues", "6000") \
.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
非常感谢!
【问题讨论】:
-
Spark 非常适合高表(很多行),但它不能很好地扩展到宽表(很多列)。您可能会遇到系统可以处理的限制 - 请参阅this comment。
-
更多讨论在this question的cmets中。在实践中,我在这些情况下看到的做法是将宽表分解为多个较薄的表。然后当你需要使用它时,使用主键进行连接。
标签: apache-spark pyspark apache-spark-sql pivot stack-overflow