【问题标题】:Generate unique ID in a mutable pyspark data frame在可变 pyspark 数据框中生成唯一 ID
【发布时间】:2020-04-11 11:20:30
【问题描述】:

我想为可能发生变化的数据框生成顺序唯一 ID。当我说更改时,这意味着在我今天生成 ID 后明天将添加更多行数。当添加更多行时,我想查找具有生成的 id 的 id 列并为新添加的数据增加

+-------+--------------------+-------------+
|deal_id|           deal_name|Unique_id    |
+-------+--------------------+--------------
| 613760|ABCDEFGHI           |            1|    
| 613740|TEST123             |            2|             
| 598946|OMG                 |            3|    

如果我明天得到更多数据,我想将相同的数据附加到这个数据帧,唯一 id 应该增加到 4 并继续。

+-------+--------------------+-------------+
|deal_id|           deal_name|Unique_id    |
+-------+--------------------+--------------
| 613760|ABCDEFGHI           |            1|    
| 613740|TEST123             |            2|             
| 598946|OMG                 |            3|
| 591234|OM21                |            4|
| 988217|Otres               |            5|
.
.
.

代码片段

deals_df_final = deals_df.withColumn("Unique_id",F.monotonically_increasing_id())

但这并没有给出顺序 ID。

我可以尝试使用索引的 row_num 和 RDD zip,但看起来数据框将是不可变的。

有什么帮助吗?我希望能够在添加数据时生成并增加 id。

【问题讨论】:

  • Unique_id 列与数据框 index 有何不同?
  • 我对这个概念比较陌生,但如果我错了,请纠正我 - 我希望从 1 开始按顺序生成 unique_id 并且它应该始终与 deal_id 列相关联......不确定是否生成索引将用于此目的,但如果我错了,请再次纠正我。这就是问题的一部分——第二部分对我来说是一个棘手的部分,在为第一组数据生成唯一 ID 之后,我希望能够将数据附加到这个数据帧并为新附加的数据生成唯一 ID继续序列。再次不确定我们是否可以使用数据框索引来做到这一点
  • default dataframe index 是一个以 0 开头的自增“列”。可以使用 this 答案将索引更改为从 1 开始。跨度>
  • 是的,它会增加。
  • 是的,熊猫数据框无法在集群中运行。我正在寻找 pyspark 数据框中的解决方案。

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

如果有帮助,请非常简短地说明 - 我遇到了同样的问题,这篇文章中的第二个示例帮助了我:https://kb.databricks.com/sql/gen-unique-increasing-values.html

我当前正在进行的代码:

from pyspark.sql import (
    SparkSession,
    functions as F,
    window as W
)

df_with_increasing_id = df.withColumn("monotonically_increasing_id", F.monotonically_increasing_id())
window = W.Window.orderBy(F.col('monotonically_increasing_id'))
df_with_consecutive_increasing_id = df_with_increasing_id.withColumn('increasing_id', F.row_number().over(window))
    df = df_with_consecutive_increasing_id.drop('monotonically_increasing_id')
# now find the maximum value in the `increasing_id` column in the current dataframe before appending new
previous_max_id = df.agg({'increasing_id': 'max'}).collect()[0]
previous_max_id = previous_max_id['max(increasing_id)']
# CREATE NEW ROW HERE
# and then create new ids (same way as creating them originally)
# then union or vertically concatenate it with the old dataframe to get the combined one
df.withColumn("cnsecutiv_increase", F.col("increasing_id") + F.lit(previous_max_id)).show()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-21
    • 2012-07-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-02-22
    相关资源
    最近更新 更多