【问题标题】:Partition Location of RDD/DataframeRDD/Dataframe 的分区位置
【发布时间】:2026-01-26 15:15:01
【问题描述】:

我有一个(相当大,想想 10e7 行)DataFrame,我根据某些属性从中过滤元素

val res = data.filter(data(FieldNames.myValue) === 2).select(pk.name, FieldName.myValue) 

我的 DataFrame 有 n 个 Partitions data.rdd.getNumPartitions

现在我想知道我的行来自哪个分区。我知道我可以像这样遍历所有分区

val temp = res.first() //or foreach, this is just an example
data.foreachPartition(f => {
    f.exists(row => row.get(0)==temp.get(0))
    //my code here
}) //compare PKs

data.rdd.mapPartitionsWithIndex((idx, f) => ...)

但是,如果我的结果和我的 DataFrame 变大,这似乎过度而且性能也不是很好。

在我执行了 filter() 操作之后,是否有 Spark 方式来执行此操作?

或者,有没有办法重写/替代 filter() 语句,以便它返回行的原点?

我也可以将分区位置保存在我的 DataFrame 中并在重新分区时更新它,但我宁愿以火花方式进行

(我发现的唯一类似问题是here,无论是问题还是评论都不是很有帮助。我还发现this可能相似但不一样)

在此先感谢您的任何帮助/指点,如果我错过了与我类似的已回答的问题,我深表歉意。

【问题讨论】:

  • mapPartitionsWithIndex 是一个简单的地图操作。它不涉及洗牌,只是分布式映射。可能还有另一种方法,但我不确定它是否真的比这更有效。

标签: apache-spark apache-spark-sql rdd spark-dataframe hadoop-partitioning


【解决方案1】:

分区数/计数不稳定,因为 Spark 将在分区中执行自动扩展和缩减。这意味着输入分区计数可能与输入文件计数不同。

这些情况下的一般模式是根据每个输入文件中的数据创建某种类型的复合键。如果密钥很大,您可以对其进行散列以减小大小。如果您不太关心碰撞,请使用Murmur3。如果您担心碰撞,请使用MD5,它仍然很快。

如果您拥有的唯一独特功能是输入文件的路径,则必须将文件路径添加为区分列。这是一种方法:

val paths = Seq(...)
val df = paths
  .map { path => 
    sqlContext.read.parquet(path)
      .withColumn("path", lit(path))
  }
  .reduceLeft(_ unionAll _)

这个想法很简单:一次读取一个输入文件,添加一个与之关联的唯一列,然后使用UNION ALL 将它们组合在一起。

【讨论】: