【问题标题】:PySpark - Losing String values when Creating Key Value PairsPySpark - 创建键值对时丢失字符串值
【发布时间】:2018-03-19 10:14:13
【问题描述】:

我需要为数据框/RDD 中的每一行创建键值对。也就是说,每个人都将成为每一行的键,而他们的关联交易是一个成为价值的列表。

我有以下例子来说明我的问题:

a = [
    ('Bob', 562,"Food", "12 May 2018"),
    ('Bob',880,"Food","01 June 2018"),
    ('Bob',380,'Household'," 16 June 2018"),
    ('Sue',85,'Household'," 16 July 2018"),
    ('Sue',963,'Household'," 16 Sept 2018")
] 
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])

然后,我创建一个函数来为每一行做键值对

def make_keys_and_value(row):
    """ Convert the dataframe rows into key value pairs

    """
    return (row["Person"], [[row["Amount"], row["Budget"],
                                 row["Date"]]])
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))

但是,当我想显示结果时,BudgetDate 变为空值。我认为这是因为它们是字符串值。

person_summarries_rdd.toDF().show(5,False)
+---+-------------------------------+
|_1 |_2                             |
+---+-------------------------------+
|Bob|[WrappedArray(562, null, null)]|
|Bob|[WrappedArray(880, null, null)]|
|Bob|[WrappedArray(380, null, null)]|
|Sue|[WrappedArray(85, null, null)] |
|Sue|[WrappedArray(963, null, null)]|
+---+-------------------------------+

我需要在仍然使用此方法的同时保留字符串的值。

【问题讨论】:

    标签: apache-spark pyspark rdd


    【解决方案1】:

    无需序列化为rdd。你可以使用pyspark.sql.functions.struct():

    import pyspark.sql.function as f
    df.withColumn('values', f.struct(f.col('Amount'), f.col('Budget'), f.col('Date')))\
        .select('Person', 'values').show(truncate=False)
    #+------+-----------------------------+
    #|Person|values                       |
    #+------+-----------------------------+
    #|Bob   |[562,Food,12 May 2018]       |
    #|Bob   |[880,Food,01 June 2018]      |
    #|Bob   |[380,Household, 16 June 2018]|
    #|Sue   |[85,Household, 16 July 2018] |
    #|Sue   |[963,Household, 16 Sept 2018]|
    #+------+-----------------------------+
    

    或者使用列表推导:

    array_columns = [c for c in df.columns if c != 'Person']
    df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\
        .select('Person', 'values').show(truncate=False)
    #+------+-----------------------------+
    #|Person|values                       |
    #+------+-----------------------------+
    #|Bob   |[562,Food,12 May 2018]       |
    #|Bob   |[880,Food,01 June 2018]      |
    #|Bob   |[380,Household, 16 June 2018]|
    #|Sue   |[85,Household, 16 July 2018] |
    #|Sue   |[963,Household, 16 Sept 2018]|
    #+------+-----------------------------+
    

    您的代码不起作用,因为您不能在 WrappedArray() 中包含混合类型。 Spark 正在从第一项 (Amount) 推断类型。

    您可以将Amount 转换为str

    def make_keys_and_value(row):
        """ Convert the dataframe rows into key value pairs
    
        """
        return (row["Person"], [[str(row["Amount"]), row["Budget"],
                                     row["Date"]]])
    person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
    person_summarries_rdd.toDF().show(truncate=False)
    #+---+---------------------------------------------+
    #|_1 |_2                                           |
    #+---+---------------------------------------------+
    #|Bob|[WrappedArray(562, Food, 12 May 2018)]       |
    #|Bob|[WrappedArray(880, Food, 01 June 2018)]      |
    #|Bob|[WrappedArray(380, Household,  16 June 2018)]|
    #|Sue|[WrappedArray(85, Household,  16 July 2018)] |
    #|Sue|[WrappedArray(963, Household,  16 Sept 2018)]|
    #+---+---------------------------------------------+
    

    或者使用tuple 而不是list

    def make_keys_and_value(row):
        """ Convert the dataframe rows into key value pairs
    
        """
        return (row["Person"], ((row["Amount"]), row["Budget"],
                                     row["Date"]))
    person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
    #+---+-----------------------------+
    #|_1 |_2                           |
    #+---+-----------------------------+
    #|Bob|[562,Food,12 May 2018]       |
    #|Bob|[880,Food,01 June 2018]      |
    #|Bob|[380,Household, 16 June 2018]|
    #|Sue|[85,Household, 16 July 2018] |
    #|Sue|[963,Household, 16 Sept 2018]|
    #+---+-----------------------------+
    

    这里我取出了嵌套的[],但如果您希望输出看起来像[[562,Food,12 May 2018]] 而不是[562,Food,12 May 2018],则可以轻松将其重新添加。


    另一种选择是使用pyspark.sql.functions.create_map() 创建地图:

    df.withColumn(
        'values',
        f.create_map(
            *reduce(
                list.__add__,
                [[f.lit(c), f.col(c)] for c in array_columns]
            )
        )
    ).select('Person', 'values').show(truncate=False)
    #+------+--------------------------------------------------------------+
    #|Person|values                                                        |
    #+------+--------------------------------------------------------------+
    #|Bob   |Map(Amount -> 562, Budget -> Food, Date -> 12 May 2018)       |
    #|Bob   |Map(Amount -> 880, Budget -> Food, Date -> 01 June 2018)      |
    #|Bob   |Map(Amount -> 380, Budget -> Household, Date ->  16 June 2018)|
    #|Sue   |Map(Amount -> 85, Budget -> Household, Date ->  16 July 2018) |
    #|Sue   |Map(Amount -> 963, Budget -> Household, Date ->  16 Sept 2018)|
    #+------+--------------------------------------------------------------+
    

    或者,如果您想直接转到Person->array 的映射:

    df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\
        .withColumn('map',f.create_map(f.col('Person'), f.col('values')))\
        .select('map')\
        .show(truncate=False)
    #+-----------------------------------------+
    #|map                                      |
    #+-----------------------------------------+
    #|Map(Bob -> [562,Food,12 May 2018])       |
    #|Map(Bob -> [880,Food,01 June 2018])      |
    #|Map(Bob -> [380,Household, 16 June 2018])|
    #|Map(Sue -> [85,Household, 16 July 2018]) |
    #|Map(Sue -> [963,Household, 16 Sept 2018])|
    #+-----------------------------------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-08-06
      • 2013-11-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-03-18
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多