【问题标题】:Apache Spark: Get the first and last row of each partitionApache Spark:获取每个分区的第一行和最后一行
【发布时间】:2020-05-18 04:09:23
【问题描述】:

我想在 spark 中获取每个分区的第一行和最后一行(我正在使用 pyspark)。我该怎么做? 在我的代码中,我使用以下键列重新分区我的数据集:

mydf.repartition(keyColumn).sortWithinPartitions(sortKey)

有没有办法获取每个分区的第一行和最后一行? 谢谢

【问题讨论】:

  • 不知道为什么我的问题被否决了。帖子有问题吗?如果您在投反对票时可以就您认为帖子的错误之处发表评论,那将非常有帮助
  • 为什么要每个分区的第一行和最后一行?你可以使用foreachPartition,它会给你一个迭代器
  • 你的意思可能是mapPartitions @maximeG foreachPartition 不允许你修改最终输出
  • OP没有说要修改输出
  • 如何使用foreachPartition 提取第一个/最后一个?据我了解,问题是关于提取每个分区的第一个/最后一个项目,即通过 Spark API 提取 4 par -> 8 个项目,没有隐藏存储或任何 3rd 方库

标签: apache-spark pyspark pyspark-dataframes


【解决方案1】:

我强烈建议不要直接使用分区。 Spark 做了很多 DAG 优化,因此当您尝试在每个分区上执行特定功能时,您对分区及其分布的所有假设都可能完全错误。

但是,您似乎有 keyColumnsortKey,所以我建议您执行以下操作:

import pyspark
import pyspark.sql.functions as f

w_asc = pyspark.sql.Window.partitionBy(keyColumn).orderBy(f.asc(sortKey))
w_desc = pyspark.sql.Window.partitionBy(keyColumn).orderBy(f.desc(sortKey))
res_df = mydf. \
 withColumn("rn_asc", f.row_number().over(w_asc)). \
 withColumn("rn_desc", f.row_number().over(w_desc)). \
 where("rn_asc = 1 or rn_desc = 1")

生成的数据框将有 2 个额外的列,其中rn_asc=1 表示第一行,rn_desc=1 表示最后一行。

【讨论】:

  • Spark 开发人员通过 Spark API 公开分区是有充分理由的,原因是能够实现与此类似的案例。我们不需要在这里使用窗口函数,因为它会引入不必要的开销。 Spark 通过 mapPartitions 方法提供了一个迭代器,正是因为直接使用迭代器非常有效。 orderBy 或 partitionBy 会导致数据混洗,这是我们一直想要避免的。如果我理解正确,OP 要求不要触摸当前分区只是为了从现有分区中获取第一个/最后一个元素。
  • 好点 Alexandros :) 完全同意。我建议使用窗口函数的原因是因为我不相信 OP 有分区(因为它们正在重新分区输入数据帧),所以无论哪种方式都需要重新洗牌。
【解决方案2】:

Scala:我认为重新分区不是通过键列,但它需要整数,您想设置的分区方式。我通过使用 spark 的Window 函数来选择第一行和最后一行。

首先,这是我的测试数据。

+---+-----+
| id|value|
+---+-----+
|  1|    1|
|  1|    2|
|  1|    3|
|  1|    4|
|  2|    1|
|  2|    2|
|  2|    3|
|  3|    1|
|  3|    3|
|  3|    5|
+---+-----+

然后,我使用了两次Window 函数,因为我不能轻易知道最后一行,但反过来很容易。

import org.apache.spark.sql.expressions.Window
val a = Window.partitionBy("id").orderBy("value")
val d = Window.partitionBy("id").orderBy(col("value").desc)

val df = spark.read.option("header", "true").csv("test.csv")
df.withColumn("marker", when(rank.over(a) === 1, "Y").otherwise("N"))
  .withColumn("marker", when(rank.over(d) === 1, "Y").otherwise(col("marker")))
  .filter(col("marker") === "Y")
  .drop("marker").show

那么最后的结果就是,

+---+-----+
| id|value|
+---+-----+
|  3|    5|
|  3|    1|
|  1|    4|
|  1|    1|
|  2|    3|
|  2|    1|
+---+-----+

【讨论】:

    【解决方案3】:

    这是使用 RDD API 中的mapPartitions 的另一种方法。我们遍历每个分区的元素,直到我们到达终点。我希望这次迭代会非常快,因为我们跳过了除两条边之外的分区的所有元素。代码如下:

    df = spark.createDataFrame([
      ["Tom", "a"],
      ["Dick", "b"],
      ["Harry", "c"],
      ["Elvis", "d"],
      ["Elton", "e"],
      ["Sandra", "f"]
    ], ["name", "toy"])
    
    def get_first_last(it):
          first = last = next(it)
          for last in it:
            pass
    
          # Attention: if first equals last by reference return only one!
          if first is last:
            return [first]
    
          return [first, last]
    
    # coalesce here is just for demonstration
    first_last_rdd = df.coalesce(2).rdd.mapPartitions(get_first_last)
    
    spark.createDataFrame(first_last_rdd, ["name", "toy"]).show()
    
    # +------+---+
    # |  name|toy|
    # +------+---+
    # |   Tom|  a|
    # | Harry|  c|
    # | Elvis|  d|
    # |Sandra|  f|
    # +------+---+
    

    PS:奇数位置将包含第一个分区元素,偶数位置将包含最后一个元素。另请注意,结果的数量将是 (numPartitions * 2) - numPartitionsWithOneItem,我预计它会相对较小,因此您不必担心新的 createDataFrame 语句的成本。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-09-29
      • 2014-11-10
      • 2019-05-24
      • 2017-03-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多