【问题标题】:Iterate Spark Dataframe running slow迭代 Spark Dataframe 运行缓慢
【发布时间】:2018-10-17 19:09:46
【问题描述】:

我想验证现有列的数据并根据特定条件创建新列。

问题:我有大约 500 列和 9K 行 (9000) 的数据集。根据我的逻辑,如果其中一列具有任何空值,则针对该列创建新列并将原始列的空值设置为 1,其余为 0。

但下面的简单代码需要几个小时才能完成,尽管我的数据并不大。

dataset_.schema.fields.map(c => {
  if(dataset_.filter(col(c.name).isNull).count() > 0)
  {
    dataset_ = dataset_.withColumn(c.name + "_isNull", when(col(c.name).isNull, 1).otherwise(0))
  }
})

请帮助我优化我的代码或提供反馈以用不同的方法实现它。

注意:我在大集群(火花纱)上尝试过同样的事情。 Google Dataproc 集群(3 个工作节点,机器类型 32 个 vCPU,280 GB 内存)

【问题讨论】:

    标签: scala apache-spark spark-dataframe apache-spark-mllib google-cloud-dataproc


    【解决方案1】:

    我尝试了很多方法...
    1) 从csv 文件或任何其他来源创建dataframe 时尝试缓存

    2) 另外,如果不影响逻辑,我们可以试试改这个 if(dataset_.filter(col(c.name).isNull).count() > 0)if(flightData.filter(col(x.name).isNull).take(1) != null )
    我们可以检查任何列名是否为空,而不是计算所有数据 因为take(1) 将在找到一条记录后立即继续运行,而.count() 将继续运行,然后将其与 0 进行比较

    3) 此外,按照目前的逻辑,我们可以将map 更改为foreach。但是,它不会影响性能,但理想情况下应该是 foreach。

    我已经在一个有 16 列和大约 100 万条记录的数据集上尝试了这些。全部应用后花了 33s。

    这是 Spark UI 快照!

    由于您有 500 列,因此与我的数据集相比,应用这些列的运行时间应该会大规模减少。
    我希望这会有所帮助!

    【讨论】:

      【解决方案2】:

      同时计算所有计数:

      val convert = df.select(
        df.columns.map(c => (count(c) =!= count("*")).alias(c)): _*
      ).first.getValuesMap[Boolean](df.columns)
      

      并使用结果添加列

      convert.collect { case (c, true) => c }.foldLeft(df) {
        (df, c) => df.withColumn(s"${c}_isNull", when(col(c).isNull, 1).otherwise(0))
      }
      

      【讨论】:

        猜你喜欢
        • 2018-10-24
        • 1970-01-01
        • 1970-01-01
        • 2022-06-18
        • 2017-12-22
        • 1970-01-01
        • 1970-01-01
        • 2015-07-25
        • 1970-01-01
        相关资源
        最近更新 更多