【发布时间】:2017-06-17 11:30:24
【问题描述】:
大家好,我有一个函数可以从一些 S3 位置加载数据集并返回有趣的数据
private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = {
import spark.implicits._
spark
.sparkContext.textFile(s3BrowseIndex)
// split text dataset
.map(line => line.split("\\s+"))
// get types for attributes
.map(BrowseIndex.strAttributesToBrowseIndex)
// cast it to a dataset (requires implicit conversions)
.toDS()
// pick rows for the given marketplaces
.where($"mid".isin(mids: _*))
// pick rows for the given indices
.where($"index".isin(indices: _*))
}
如果有人提供mids = Seq() 或indices = Seq(),此实现将过滤掉所有内容。另一方面,我希望语义是“仅当 mids 不为空时才应用此 where 子句”(indices 相同),这样如果函数的用户提供空序列,则不会发生过滤。
有没有很好的功能性方法来做到这一点?
【问题讨论】:
标签: scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset