【问题标题】:How do I add an persistent column of row ids to Spark DataFrame?如何将行 ID 的持久列添加到 Spark DataFrame?
【发布时间】:2016-06-12 19:35:02
【问题描述】:

这个问题并不新鲜,但我在 Spark 中发现了令人惊讶的行为。我需要将一列行 ID 添加到 DataFrame。我使用了 DataFrame 方法 monotonically_increasing_id() 它确实给了我一个额外的唯一行 ID 列(顺便说一下,它们不是连续的,但是是唯一的)。

我遇到的问题是,当我过滤 DataFrame 时,结果 DataFrame 中的行 ID 被重新分配。这两个 DataFrame 如下所示。

  • 第一个是初始DataFrame,添加行ID如下:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • 第二个 DataFrame 是通过 df.filter(col("P")) 在 col P 上过滤后获得的。

custId 169 的 rowId 说明了问题,在初始 DataFrame 中为 5,但在过滤掉 custId 169 时,rowId (5) 被重新分配给 custmId 773!我不知道为什么这是默认行为。

我希望rowIds 具有“粘性”;如果我从 DataFrame 中删除行,我不希望它们的 ID “重复使用”,我希望它们与它们的行一起消失。有可能这样做吗?我没有看到任何从 monotonically_increasing_id 方法请求此行为的标志。

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
|      875|[54,120,1...|   true|    4|
|      773|[3136,317...|   true|    5|

【问题讨论】:

  • 你能分享你生成两个示例数据帧的完整代码吗?就其价值而言,这可能是由于 SQL 查询优化发生的,其中“独立”映射阶段可能会重新排列。
  • Hamel,除了我发布的内容之外,真的没有其他转换或动作。显示的数据帧是 df.show() 的结果。您可以非常轻松地重新创建此行为,创建一个数据框并添加一个行 ID 列,然后向其中添加一个随机布尔列。然后对该列进行过滤,看看你从单调增加中获得的行 ID 是如何“重复使用”的。
  • @Kai 我实际上要补充一点,重现它的最简单方法是仅使用单个分区。
  • Spark 跟踪器上的问题:SPARK-14241
  • 感谢尼克接受这个。

标签: apache-spark dataframe apache-spark-sql


【解决方案1】:

Spark 2.0

  • 此问题已在 Spark 2.0 中通过 SPARK-14241 解决。

  • 另一个类似的问题已在 Spark 2.1 中解决,SPARK-14393

Spark 1.x

您遇到的问题相当微妙,但可以归结为一个简单的事实monotonically_increasing_id 是一个极其丑陋的功能。它显然不是纯粹的,它的价值取决于你完全无法控制的东西。

它不接受任何参数,因此从优化器的角度来看,它何时被调用并不重要,并且可以在所有其他操作之后推送。因此,您看到的行为。

如果您查看代码,您会发现这是通过使用Nondeterministic 扩展MonotonicallyIncreasingID 表达式来明确标记的。

我认为没有任何优雅的解决方案,但您可以处理此问题的一种方法是添加对过滤值的人为依赖。例如,像这样的 UDF:

from pyspark.sql.types import LongType
from pyspark.sql.functions import udf

bound = udf(lambda _, v: v, LongType()) 

(df
  .withColumn("rn", monotonically_increasing_id())
  # Due to nondeterministic behavior it has to be a separate step
  .withColumn("rn", bound("P", "rn"))  
  .where("P"))

一般来说,在RDD 上使用zipWithIndex 添加索引然后将其转换回DataFrame 会更简洁。


* 上面显示的解决方法在 Spark 2.x 中不再是有效的解决方案(也不是必需的),其中 Python UDF 是执行计划优化的主题。

【讨论】:

    【解决方案2】:

    我无法重现这个。我使用的是 Spark 2.0,所以可能行为已经改变,或者我没有和你做同样的事情。

    val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
    .toDF("name", "value","flag")
    .withColumn("rowd", monotonically_increasing_id())
    
    df.show
    
    val df2 = df.filter(col("flag")=== true)
    
    df2.show
    
    df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
    +-----+-----+-----+----+
    | name|value| flag|rowd|
    +-----+-----+-----+----+
    |  one|    1| true|   0|
    |  two|    2|false|   1|
    |three|    3| true|   2|
    | four|    4| true|   3|
    +-----+-----+-----+----+
    df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
    +-----+-----+----+----+
    | name|value|flag|rowd|
    +-----+-----+----+----+
    |  one|    1|true|   0|
    |three|    3|true|   2|
    | four|    4|true|   3|
    +-----+-----+----+----+
    

    【讨论】:

    【解决方案3】:

    要绕过 monotonically_increasing_id() 的移动评估,您可以尝试将数据帧写入磁盘,然后重新读取。那么 id 列现在只是一个正在读取的数据字段,而不是在管道中的某个点动态计算。虽然这是一个非常丑陋的解决方案,但当我进行快速测试时它就起作用了。

    【讨论】:

      【解决方案4】:

      这对我有用。创建另一个标识列并使用窗口函数 row_number

      import org.apache.spark.sql.functions.{row_number}
      import org.apache.spark.sql.expressions.Window
      
      val df1: DataFrame = df.withColumn("Id",lit(1))
      
      df1
      .select(
      ...,
      row_number()
      .over(Window
      .partitionBy("Id"
      .orderBy(col("...").desc))
      )
      .alias("Row_Nbr")
      )
      

      【讨论】:

      • 这可行,但会强制数据进入 1 个分区,从而移除分布,并可能导致大型数据集出现内存异常
      【解决方案5】:

      我最近正在研究一个类似的问题。虽然monotonically_increasing_id() 很快,但它并不可靠,不会给你连续的行号,只会增加唯一的整数。

      创建一个 windows 分区然后使用row_number().over(some_windows_partition) 非常耗时。

      目前最好的解决方案是使用带索引的压缩文件,然后将压缩文件转换回原始数据帧,新架构包括索引列。

      试试这个:

      from pyspark.sql import Row
      from pyspark.sql.types import StructType, StructField, LongType
      
      new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
      zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
      indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
      

      original_dataframedataframe,您必须在其中添加索引,row_with_index 是具有列索引的新架构,您可以将其写为

      row_with_index = Row(
      "calendar_date"
      ,"year_week_number"
      ,"year_period_number"
      ,"realization"
      ,"index"
      )
      

      这里,calendar_dateyear_week_numberyear_period_numberrealization 是我原来的 dataframe 的列。您可以将名称替换为列的名称。索引是您必须为行号添加的新列名。

      row_number().over(some_windows_partition) 方法相比,此过程在很大程度上更加高效和顺畅。

      希望这会有所帮助。

      【讨论】:

        【解决方案6】:

        为了在 Chris T 解决方案中获得更好的性能,您可以尝试写入 apache ignite 共享数据帧而不是写入磁盘。 https://ignite.apache.org/use-cases/spark/shared-memory-layer.html

        【讨论】:

          【解决方案7】:

          最好的方法是使用唯一键的 concat 散列。

          例如:在python中:

          from pyspark.sql.functions import concat, md5
          
          unique_keys = ['event_datetime', 'ingesttime']
          raw_df.withColumn('rowid', md5(concat(*unique_keys)))
          

          原因:

          【讨论】:

            猜你喜欢
            • 2015-08-11
            • 1970-01-01
            • 2016-02-06
            • 1970-01-01
            • 1970-01-01
            • 2017-05-10
            • 1970-01-01
            • 2021-06-11
            • 2019-03-02
            相关资源
            最近更新 更多