【问题标题】:Spark window function on dataframe with large number of columns具有大量列的数据帧上的 Spark 窗口函数
【发布时间】:2018-02-19 16:43:42
【问题描述】:

我有一个从 csv 文件中读取的 ML 数据框。它包含三种类型的列:

ID 时间戳 Feature1 Feature2...Feature_n

其中 n 约为 500(用 ML 术语来说是 500 个特征)。数据集中的总行数约为 1.6 亿。

由于这是之前完全连接的结果,因此有许多特征没有设置值。

我的目标是运行一个“填充”函数(fillna 样式的表单 python pandas),其中每个空的特征值都被设置为该列先前可用的值,每个 Id 和 Date。

我正在尝试使用以下 spark 2.2.1 代码来实现这一点:

 val rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)

 val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(-50000, -1)

 val columns = Array(...) //first 30 columns initially, just to see it working

val rawDataSetFilled = columns.foldLeft(rawDataset) { (originalDF, columnToFill) =>
      originalDF.withColumn(columnToFill, coalesce(col(columnToFill), last(col(columnToFill), ignoreNulls = true).over(window)))
    }

我正在使用 spark 2.2.1 在 Amazon EMR 上的 4 个 m4.large 实例上运行此作业。并启用动态分配。

作业运行超过 2 小时未完成。

我在代码级别做错了吗?鉴于数据的大小和实例,我认为它应该在合理的时间内完成?而且我什至没有尝试过完整的 500 列,只有大约 30 列!

查看容器日志,我看到的都是很多这样的日志:

INFO codegen.CodeGenerator:在 166.677493 毫秒内生成的代码

INFO 执行。ExternalAppendOnlyUnsafeRowArray:达到溢出 阈值 4096 行,切换到 org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

我尝试将参数 spark.sql.windowExec.buffer.spill.threshold 设置为更大的值,没有任何影响。还有其他我应该知道的设置吗?这 2 行是我在任何容器日志中看到的唯一行。

在 Ganglia 中,我看到大多数 CPU 内核在完全使用时达到峰值,但内存使用量低于可用的最大值。所有执行者都已分配并正在工作。

【问题讨论】:

  • 你看执行计划了吗?如果您的数据框未按 ID 重新分区并按 Id 和 DATE 在分区中排序,则将在 foldLeft 之前进行洗牌和排序。这可能是原因吗?另外,限制真的需要-50000吗?也许您应该先尝试一些较小的值,例如 -10。

标签: apache-spark spark-dataframe


【解决方案1】:

我已经设法在不使用 withColumn 调用的情况下重写了 向左折叠 逻辑。显然,对于大量列,它们可能非常慢,因此我也遇到了 stackoverflow 错误。

我很想知道为什么会有如此巨大的差异 - 以及查询计划执行幕后究竟发生了什么,这使得重复 withColumns 调用如此缓慢。

证明非常有用的链接:Spark Jira issuethis stackoverflow question

    var rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)    
    val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    rawDataset = rawDataset.select(rawDataset.columns.map(column => coalesce(col(column), last(col(column), ignoreNulls = true).over(window)).alias(column)): _*)
    rawDataset.write.option("header", "true").csv(outputLocation)

【讨论】:

  • 不错的解决方案。现在有多快?
  • 您可以看到一篇关于此here 的非常好的博客文章,您还可以获得一个很好的基准。
猜你喜欢
  • 2016-02-28
  • 1970-01-01
  • 1970-01-01
  • 2017-07-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多