【发布时间】:2017-08-23 07:51:16
【问题描述】:
所以我一直在尝试对数据集执行 cumsum 操作。我想强调的是,我希望我的 cumsum 发生在我的数据集的分区上(例如,随着时间的推移,对于 personA 的 feature1 的 cumsum)。
我知道该怎么做,而且它可以完美地“独立”运行——我稍后会解释这部分。这是执行此操作的代码:
// it's admitted that this DF contains all data I need
// with one column/possible value, with only 1/0 in each line
// 1 <-> feature has the value
// 0 <-> feature doesn't contain the value
// this DF is the one I get after the one-hot operation
// this operation is performed to apply ML algorithms on features
// having simultaneously multiple values
df_after_onehot.createOrReplaceTempView("test_table")
// @param DataFrame containing all possibles values eg. A, B, C
def cumSumForFeatures(values: DataFrame) = {
values
.map(value => "CAST(sum(" + value(0) + ") OVER (PARTITION BY person ORDER BY date) as Integer) as sum_" + value(0))
.reduce(_+ ", " +_)
}
val req = "SELECT *, " + cumSumForFeatures(possible_segments) + " FROM test_table"
// val req = "SELECT * FROM test_table"
println("executing: " + req)
val data_after_cumsum = sqLContext.sql(req).orderBy("person", "date")
data_after_cumsum.show(10, false)
当我尝试使用之前的一些预处理执行相同的操作(例如 one-hot 操作或之前添加计算特征)时,就会出现问题。我尝试了一个非常小的数据集,但它不起作用。
这是打印的堆栈跟踪(至少应该让您感兴趣的部分):
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
[Executor task launch worker-3] ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
所以它似乎与 GC 问题/JVM 堆大小有关?我只是不明白它与我的预处理有什么关系?
- 我尝试在不再使用的 DF 上执行 unpersist 操作。
- 我尝试修改我机器上的选项(例如 -Xmx2048m)。
- 在 AWS 上部署后,问题仍然存在。
我的 pom.xml 的摘录(适用于 Java、Spark、Scala 版本):
<spark.version>2.1.0</spark.version>
<scala.version>2.10.4</scala.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
您知道如何解决我的问题吗? 谢谢
【问题讨论】:
-
@som-snytt 我更新了提供 Scala、jdk 和 Spark 版本的问题
-
Spark 作业因 OOM 失败的方式有多种:一种常见的模式是在驱动程序上收集过多数据,或者在一组后的单个工作人员上收集过多数据被过度代表。如果没有有关您的转换的更多详细信息,很难为您提供帮助。作为起点,您应该查看 Spark GUI 并检查数据量是否在执行程序之间均匀分布。
-
@FurryMachine 谢谢!我一直在研究这个,但是当我在本地启动我的工作时也会发生这种情况。当我通过预处理(包括简单的特征计算 + one-hot)启动我的工作时会发生这种情况,如果我自己启动它就不会发生(我预处理,我将数据存储在我的驱动器上,然后启动另一个执行 cum-sum 的 JVM 实例)。我正在尝试执行的转换是一个 cum-sum 操作(每个人的每个特征随时间的累积和,它应该在 O(n) 中完成)。
-
哎呀,这样的事情可能发生在 Spark 上,而且没有太多工作可以找到原因。我猜想在链接操作时会发生某种不幸的优化。如果检查点您的中间结果有效,我会说“去做”。有时,仅仅添加一个persist() 我们有时只是一个show(),也可能会改变优化器的行为。这有时会让人非常沮丧。这就是为什么我总是喜欢简单的步骤并存储中间结果,这样我就可以分别查看、调试和修复每个部分。但这并不总是很有意义。
标签: scala apache-spark apache-spark-sql spark-dataframe bigdata