【问题标题】:Conditional application of `filter`/`where` to a Spark `Dataset`/`Dataframe``filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`
【发布时间】:2017-06-17 11:30:24
【问题描述】:

大家好,我有一个函数可以从一些 S3 位置加载数据集并返回有趣的数据

private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = {
import spark.implicits._

spark
  .sparkContext.textFile(s3BrowseIndex)
  // split text dataset
  .map(line => line.split("\\s+"))
  // get types for attributes
  .map(BrowseIndex.strAttributesToBrowseIndex)
  // cast it to a dataset (requires implicit conversions)
  .toDS()
  // pick rows for the given marketplaces
  .where($"mid".isin(mids: _*))
  // pick rows for the given indices
  .where($"index".isin(indices: _*))

}

如果有人提供mids = Seq()indices = Seq(),此实现将过滤掉所有内容。另一方面,我希望语义是“仅当 mids 不为空时才应用此 where 子句”(indices 相同),这样如果函数的用户提供空序列,则不会发生过滤。

有没有很好的功能性方法来做到这一点?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset


    【解决方案1】:

    如果您不介意稍微复杂的逻辑,Raphael Roth 的回答对于应用过滤器的特定问题是一个不错的选择。适用于任何条件转换的通用解决方案(不仅仅是过滤,也不仅仅是在某个决策分支上什么都不做)是使用transform,例如,

    spark
      .sparkContext.textFile(s3BrowseIndex)
      // split text dataset
      .map(line => line.split("\\s+"))
      // get types for attributes
      .map(BrowseIndex.strAttributesToBrowseIndex)
      // cast it to a dataset (requires implicit conversions)
      .toDS()
      .transform { ds =>
        // pick rows for the given marketplaces
        if (mids.isEmpty) ds
        else ds.where($"mid".isin(mids: _*))
      }
      .transform { ds =>
        // pick rows for the given indices
        if (indices.isEmpty) ds
        else ds.where($"index".isin(indices: _*))
      }
    

    如果您使用的是稳定类型的数据集(或数据帧,Dataset[Row]),transform 可能非常有用,因为您可以构建转换函数序列然后应用它们:

    transformations.foldLeft(ds)(_ transform _)
    

    在许多情况下,这种方法有助于代码重用和可测试性。

    【讨论】:

      【解决方案2】:

      您可以使用短路评估,这应该仅在提供 Seqs 不为空时应用过滤器:

      import org.apache.spark.sql.functions.lit
      
      spark
          .sparkContext.textFile(s3BrowseIndex)
          // split text dataset
          .map(line => line.split("\\s+"))
          // get types for attributes
          .map(BrowseIndex.strAttributesToBrowseIndex)
          // cast it to a dataset (requires implicit conversions)
          .toDS()
          // pick rows for the given marketplaces
          .where(lit(mids.isEmpty) or $"mid".isin(mids: _*))
          // pick rows for the given indices
          .where(lit(indices.isEmpty) or $"index".isin(indices: _*))
      

      【讨论】:

      • 感谢 Raphael,您的解决方案有效。我投了赞成票!我将选择 Sim 的答案作为这个问题的答案,尽管它具有普遍性和更简单的逻辑。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-04-10
      • 1970-01-01
      • 2017-02-16
      • 2017-04-03
      • 1970-01-01
      • 1970-01-01
      • 2016-09-13
      相关资源
      最近更新 更多