【问题标题】:Duplicate row in PySpark Dataframe based off value in another columnPySpark Dataframe 中的重复行基于另一列中的值
【发布时间】:2017-05-20 06:25:31
【问题描述】:

我有一个如下所示的数据框:

ID    NumRecords
123   2
456   1
789   3

我想创建一个新的数据框,它连接两列并根据 NumRecords 中的值复制行

所以输出应该是

ID_New  123-1
ID_New  123-2
ID_New  456-1
ID_New  789-1
ID_New  789-2
ID_New  789-3

我正在研究“爆炸”功能,但根据我看到的示例,它似乎只需要一个常数。

【问题讨论】:

标签: dataframe duplicates pyspark


【解决方案1】:

我遇到了类似的问题,此代码将根据 NumRecords 列中的值复制行:

from pyspark.sql import Row


def duplicate_function(row):
    data = []  # list of rows to return
    to_duplicate = float(row["NumRecords"])

    i = 0
    while i < to_duplicate:
        row_dict = row.asDict()  # convert a Spark Row object to a Python dictionary
        row_dict["SERIAL_NO"] = str(i)
        new_row = Row(**row_dict)  # create a Spark Row object based on a Python dictionary
        to_return.append(new_row)  # adds this Row to the list
        i += 1

    return data  # returns the final list


# create final dataset based on value in NumRecords column
df_flatmap = df_input.rdd.flatMap(duplicate_function).toDF(df_input.schema)

【讨论】:

    【解决方案2】:

    你可以使用udf

    from pyspark.sql.functions import udf, explode, concat_ws
    from pyspark.sql.types import *
    
    range_ = udf(lambda x: [str(y) for y in range(1, x + 1)], ArrayType(StringType()))
    
    df.withColumn("records", range_("NumRecords") \
      .withColumn("record", explode("records")) \
      .withColumn("ID_New", concat_ws("-", "id", "record"))
    

    【讨论】:

    • 我在 df.withColumn("records", range_("NumRecords") 上得到“元组对象不可调用”
    猜你喜欢
    • 2020-10-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多