【问题标题】:Spark Caused by: java.lang.StackOverflowError Window Function?Spark 引起:java.lang.StackOverflowError 窗口函数?
【发布时间】:2019-10-15 13:10:18
【问题描述】:

遇到错误,我认为是由窗口函数引起的。

当我应用此脚本并仅保留几个示例行时,它可以正常工作,但是当我将它应用到我的整个数据集时(只有几 GB) 当试图坚持到 hdfs 时,它在最后一步失败并出现这个奇怪的错误......当我坚持不带窗口函数时脚本工作,所以问题必须来自那个(我有大约 325 个功能列正在运行for 循环)。

知道是什么导致了这个问题吗?我的目标是通过前向填充方法在我的数据框中的每个变量上估算时间序列数据。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
import sys
print(spark.version)
'2.3.0'

# sample data
df = spark.createDataFrame([('2019-05-10 7:30:05', '1', '10', '0.5', 'FALSE'),\
                            ('2019-05-10 7:30:10', '2', 'UNKNOWN', '0.24', 'FALSE'),\
                            ('2019-05-10 7:30:15', '3', '6', 'UNKNOWN', 'TRUE'),\
                            ('2019-05-10 7:30:20', '4', '7', 'UNKNOWN', 'UNKNOWN'),\
                            ('2019-05-10 7:30:25', '5', '10', '1.1', 'UNKNOWN'),\
                            ('2019-05-10 7:30:30', '6', 'UNKNOWN', '1.1', 'NULL'),\
                            ('2019-05-10 7:30:35', '7', 'UNKNOWN', 'UNKNOWN', 'TRUE'),\
                            ('2019-05-10 7:30:49', '8', '50', 'UNKNOWN', 'UNKNOWN')], ["date", "id", "v1", "v2", "v3"])

df = df.withColumn("date", F.col("date").cast("timestamp"))

# imputer process / all cols that need filled are strings
def stringReplacer(x, y):
    return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
  for i in cols:
    window = Window\
    .partitionBy(F.month(partitioner))\
    .orderBy(partitioner)\
    .rowsBetween(-sys.maxsize, 0)

    df = df\
    .withColumn(i, stringReplacer(F.col(i), value))
    fill = F.last(df[i], ignorenulls=True).over(window)
    df = df.withColumn(i,  fill)
  return df
df2 = forwardFillImputer(df, cols=[i for i in df.columns])

# errors here
df2\
.write\
.format("csv")\
.mode("overwrite")\
.option("header", "true")\
.save("test_window_func.csv")
Py4JJavaError: An error occurred while calling o13504.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
    at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:381)

可能的工作解决方案

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
    window = Window \
     .partitionBy(F.month(partitioner)) \
     .orderBy(partitioner) \
     .rowsBetween(-sys.maxsize, 0)
    imputed_cols = [F.last(stringReplacer(F.col(i), value), ignorenulls=True).over(window).alias(i) 
                    for i in cols]
    missing_cols = [i for i in df.columns if i not in cols]
    return df.select(missing_cols+imputed_cols)

df2 = forwardFillImputer(df, cols=[i for i in df.columns[1:]])

df2.printSchema()
root
 |-- date: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)
 |-- v3: string (nullable = true)

df2.show()
+-------------------+---+---+----+-----+
|               date| id| v1|  v2|   v3|
+-------------------+---+---+----+-----+
|2019-05-10 07:30:05|  1| 10| 0.5|FALSE|
|2019-05-10 07:30:10|  2| 10|0.24|FALSE|
|2019-05-10 07:30:15|  3|  6|0.24| TRUE|
|2019-05-10 07:30:20|  4|  7|0.24| TRUE|
|2019-05-10 07:30:25|  5| 10| 1.1| TRUE|
|2019-05-10 07:30:30|  6| 10| 1.1| NULL|
|2019-05-10 07:30:35|  7| 10| 1.1| TRUE|
|2019-05-10 07:30:49|  8| 50| 1.1| TRUE|
+-------------------+---+---+----+-----+

【问题讨论】:

    标签: python scala apache-spark pyspark


    【解决方案1】:

    通过提供的堆栈跟踪,我相信错误来自执行计划的准备,正如它所说:

    Caused by: java.lang.StackOverflowError
        at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)
    

    我相信这是因为你在循环中调用了方法.withColumn 两次。 .withColumn 在 Spark 执行计划中所做的基本上是一个 select 语句,其中包含按照方法中指定的更改了 1 列的所有列。如果您有 325 列,那么对于单次迭代,这将在 325 列上调用 select 两次 -> 将 650 列传递给规划器。这样做 325 次,您会看到它是如何产生开销的。

    不过有趣的是,虽然您不会收到小样本的此错误,但我希望不会出现此错误。

    无论如何,您可以尝试像这样替换您的 forwardFillImputer:

    def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
        window = Window \
         .partitionBy(F.month(partitioner)) \
         .orderBy(partitioner) \
         .rowsBetween(-sys.maxsize, 0)
    
        imputed_cols = [F.last(stringReplacer(F.col(i), value), ignorenulls=True).over(window).alias(i) 
                        for i in cols]
    
        missing_cols = [F.col(i) for i in df.columns if i not in cols]
    
        return df.select(missing_cols + imputed_cols)
    

    这样你基本上只是将一个select语句解析到planner中,这应该更容易处理。

    作为一个警告,通常 Spark 不能很好地处理大量列,因此您可能会在此过程中看到其他奇怪问题。

    【讨论】:

    • 感谢您的提示!明天会试试这个,让你知道会发生什么......你知道解释为什么火花在很多列中表现不佳的任何文档?
    • 不幸的是没有文档,抱歉:(这只是我在处理具有 1000 列或更多列的数据帧时的经验。但是我相信这是因为 Spark 在列数方面不能很好地扩展,主要是 Spark在行上缩放,Spark 在行上而不是在列上分区数据帧。但是每一行都包含所有选定的列,因此性能会降低。此外,如果您有很多列,您可能会达到此处提到的 JVM 限制stackoverflow.com/questions/44557739/…
    • 你是对的,别名丢失了。我刚刚更新了代码,它还在返回那些废话吗?
    • 谢谢!我看到别名现在运行良好......明天将在大数据集上对此进行测试。 missing_cols 的目的是什么?理想情况下,我不想对我的分区列进行估算,因此对于输出,我更改为 df2 = forwardFillImputer(df, cols=[i for i in df.columns[1:]])
    • 为问题添加了可能的解决方案
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-17
    • 2017-04-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多