【问题标题】:Pyspark: Split multiple array columns into rowsPyspark:将多个数组列拆分为行
【发布时间】:2017-04-22 23:28:34
【问题描述】:

我有一个数据框,它有一行和几列。一些列是单个值,而其他列是列表。所有列表列的长度相同。我想将每个列表列拆分为单独的行,同时保持所有非列表列不变。

样本 DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

我想要什么:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

如果我只有一个列表列,只需执行explode 即可轻松完成:

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

但是,如果我也尝试 explode c 列,我最终会得到一个长度为我想要的平方的数据框:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

我想要的是 - 对于每一列,获取该列中数组的第 n 个元素并将其添加到新行中。我已经尝试在数据框中的所有列上映射一个爆炸,但这似乎也不起作用:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    火花 >= 2.4

    您可以将zip_ udf 替换为arrays_zip 函数

    from pyspark.sql.functions import arrays_zip, col, explode
    
    (df
        .withColumn("tmp", arrays_zip("b", "c"))
        .withColumn("tmp", explode("tmp"))
        .select("a", col("tmp.b"), col("tmp.c"), "d"))
    

    火花

    使用 DataFrames 和 UDF:

    from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
    from pyspark.sql.functions import col, udf, explode
    
    zip_ = udf(
      lambda x, y: list(zip(x, y)),
      ArrayType(StructType([
          # Adjust types to reflect data types
          StructField("first", IntegerType()),
          StructField("second", IntegerType())
      ]))
    )
    
    (df
        .withColumn("tmp", zip_("b", "c"))
        # UDF output cannot be directly passed to explode
        .withColumn("tmp", explode("tmp"))
        .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
    

    RDDs:

    (df
        .rdd
        .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
        .toDF(["a", "b", "c", "d"]))
    

    由于 Python 通信开销,这两种解决方案都效率低下。如果数据大小是固定的,你可以这样做:

    from functools import reduce
    from pyspark.sql import DataFrame
    
    # Length of array
    n = 3
    
    # For legacy Python you'll need a separate function
    # in place of method accessor 
    reduce(
        DataFrame.unionAll, 
        (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
            for i in range(n))
    ).toDF("a", "b", "c", "d")
    

    甚至:

    from pyspark.sql.functions import array, struct
    
    # SQL level zip of arrays of known size
    # followed by explode
    tmp = explode(array(*[
        struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
        for i in range(n)
    ]))
    
    (df
        .withColumn("tmp", tmp)
        .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
    

    与 UDF 或 RDD 相比,这应该明显更快。泛化为支持任意数量的列:

    # This uses keyword only arguments
    # If you use legacy Python you'll have to change signature
    # Body of the function can stay the same
    def zip_and_explode(*colnames, n):
        return explode(array(*[
            struct(*[col(c).getItem(i).alias(c) for c in colnames])
            for i in range(n)
        ]))
    
    df.withColumn("tmp", zip_and_explode("b", "c", n=3))
    

    【讨论】:

    • Spark >= 2.4 的解决方案如何真正起作用?文档说爆炸输入“应该是数组或映射类型,而不是字符串”,字面上引用了它引发的异常。 spark.apache.org/docs/latest/api/python/…
    • 你如何处理不同列中大小不均的列表。要求将值替换为 -1 以获得更短的列表。现在它显示为 null。
    【解决方案2】:

    一个班轮(对于Spark>=2.4.0):

    df.withColumn("bc", arrays_zip("b","c"))
      .select("a", explode("bc").alias("tbc"))
      .select("a", col"tbc.b", "tbc.c").show()
    

    需要导入:

    from pyspark.sql.functions import arrays_zip


    步骤-

    1. 创建一个 bc 列,它是 array_zipbc
    2. 分解bc 得到一个结构tbc
    3. 选择所需的列abc(全部按要求展开)。

    输出:

    > df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
    +---+---+---+
    |  a|  b|  c|
    +---+---+---+
    |  1|  1|  7|
    |  1|  2|  8|
    |  1|  3|  9|
    +---+---+---+
    

    【讨论】:

      【解决方案3】:

      您需要使用flatMap,而不是map,因为您想从每个输入行中生成多个输出行。

      from pyspark.sql import Row
      def dualExplode(r):
          rowDict = r.asDict()
          bList = rowDict.pop('b')
          cList = rowDict.pop('c')
          for b,c in zip(bList, cList):
              newDict = dict(rowDict)
              newDict['b'] = b
              newDict['c'] = c
              yield Row(**newDict)
      
      df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))
      

      【讨论】:

      • 如果第一个 df 有 3 个值,而第二个 df 有 2 个值,我们的 zip 恰好返回两对而不是 3。你能建议一下吗?
      • Zip 将 obj 的第一个元素与另一个对象的第一个元素、第二个与第二个等配对在一起,直到其中一个对象的元素用完。在您的情况下,在 2 个值之后。换句话说,它将对元素进行配对,直到没有更多要配对的项目。要给出任何建议,我需要知道您希望您的程序如何处理未配对的元素(例如,您想要第二组中的 null 吗?)。此外,此示例中只有 1 个 df。如果您的问题与这个不同,最好再问一个问题
      • 感谢@David 的回复。我想到了。使用 Izip 帮助解决了这个问题。但我仍然感谢你的回应伙伴。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-25
      • 1970-01-01
      • 2021-06-30
      相关资源
      最近更新 更多