【问题标题】:Split Spark Dataframe string column into multiple columns将 Spark Dataframe 字符串列拆分为多列
【发布时间】:2017-01-07 05:21:38
【问题描述】:

我看到很多人建议Dataframe.explode 是一种有用的方法,但它会导致比原始数据框更多的行,这根本不是我想要的。我只是想做非常简单的 Dataframe 等效:

rdd.map(lambda row: row + [row.my_str_col.split('-')])

它看起来像:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg

并将其转换为:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

我知道pyspark.sql.functions.split(),但它会生成一个嵌套数组列,而不是我想要的两个顶级列。

理想情况下,我希望这些新列也被命名。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    pyspark.sql.functions.split() 是正确的方法 - 您只需将嵌套的 ArrayType 列展平为多个顶级列。在这种情况下,每个数组只包含 2 个项目,这很容易。您只需使用 Column.getItem() 将数组的每个部分作为列本身检索:

    split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
    df = df.withColumn('NAME1', split_col.getItem(0))
    df = df.withColumn('NAME2', split_col.getItem(1))
    

    结果将是:

    col1 | my_str_col | NAME1 | NAME2
    -----+------------+-------+------
      18 |  856-yygrm |   856 | yygrm
     201 |  777-psgdg |   777 | psgdg
    

    我不确定在嵌套数组从行到行的大小不同的一般情况下如何解决这个问题。

    【讨论】:

    • 有没有办法将剩余的项目放在一个列中?即第三列中的split_col.getItem(2 - n)。我想像上面的循环那样为所有项目创建列然后将它们连接起来可能会起作用,但我不知道这是否非常有效。
    • 使用 df.withColumn('NAME_remaining', pyspark.sql.functions.split(df[my_str_col'],'-',3).getItem(2) 获取剩余项。@987654321 @
    • 我发现,如果您尝试将拆分项目之一分配回原始列,则必须在拆分之前使用 withColumnRenamed() 重命名原始列,以避免出现明显相关的错误到issues.apache.org/jira/browse/SPARK-14948
    • 如何进行拆分,使拆分的第一部分是列名,第二部分是列值?
    【解决方案2】:

    这是针对一般情况的解决方案,不需要提前知道数组的长度,使用collect,或使用udfs。不幸的是,这只适用于spark 2.1 及更高版本,因为它需要posexplode 函数。

    假设您有以下 DataFrame:

    df = spark.createDataFrame(
        [
            [1, 'A, B, C, D'], 
            [2, 'E, F, G'], 
            [3, 'H, I'], 
            [4, 'J']
        ]
        , ["num", "letters"]
    )
    df.show()
    #+---+----------+
    #|num|   letters|
    #+---+----------+
    #|  1|A, B, C, D|
    #|  2|   E, F, G|
    #|  3|      H, I|
    #|  4|         J|
    #+---+----------+
    

    拆分letters 列,然后使用posexplode 分解生成的数组以及数组中的位置。接下来使用pyspark.sql.functions.expr 抓取此数组中索引pos 处的元素。

    import pyspark.sql.functions as f
    
    df.select(
            "num",
            f.split("letters", ", ").alias("letters"),
            f.posexplode(f.split("letters", ", ")).alias("pos", "val")
        )\
        .show()
    #+---+------------+---+---+
    #|num|     letters|pos|val|
    #+---+------------+---+---+
    #|  1|[A, B, C, D]|  0|  A|
    #|  1|[A, B, C, D]|  1|  B|
    #|  1|[A, B, C, D]|  2|  C|
    #|  1|[A, B, C, D]|  3|  D|
    #|  2|   [E, F, G]|  0|  E|
    #|  2|   [E, F, G]|  1|  F|
    #|  2|   [E, F, G]|  2|  G|
    #|  3|      [H, I]|  0|  H|
    #|  3|      [H, I]|  1|  I|
    #|  4|         [J]|  0|  J|
    #+---+------------+---+---+
    

    现在我们根据这个结果创建两个新列。第一个是我们的新列的名称,它将是letter 和数组中的索引的串联。第二列将是数组中相应索引处的值。我们通过利用pyspark.sql.functions.expr 的功能来获得后者,它允许我们使用use column values as parameters

    df.select(
            "num",
            f.split("letters", ", ").alias("letters"),
            f.posexplode(f.split("letters", ", ")).alias("pos", "val")
        )\
        .drop("val")\
        .select(
            "num",
            f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
            f.expr("letters[pos]").alias("val")
        )\
        .show()
    #+---+-------+---+
    #|num|   name|val|
    #+---+-------+---+
    #|  1|letter0|  A|
    #|  1|letter1|  B|
    #|  1|letter2|  C|
    #|  1|letter3|  D|
    #|  2|letter0|  E|
    #|  2|letter1|  F|
    #|  2|letter2|  G|
    #|  3|letter0|  H|
    #|  3|letter1|  I|
    #|  4|letter0|  J|
    #+---+-------+---+
    

    现在我们可以只 groupBy numpivot DataFrame。综上所述,我们得到:

    df.select(
            "num",
            f.split("letters", ", ").alias("letters"),
            f.posexplode(f.split("letters", ", ")).alias("pos", "val")
        )\
        .drop("val")\
        .select(
            "num",
            f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
            f.expr("letters[pos]").alias("val")
        )\
        .groupBy("num").pivot("name").agg(f.first("val"))\
        .show()
    #+---+-------+-------+-------+-------+
    #|num|letter0|letter1|letter2|letter3|
    #+---+-------+-------+-------+-------+
    #|  1|      A|      B|      C|      D|
    #|  3|      H|      I|   null|   null|
    #|  2|      E|      F|      G|   null|
    #|  4|      J|   null|   null|   null|
    #+---+-------+-------+-------+-------+
    

    【讨论】:

    • 仅供参考,我尝试使用 3909 个元素拆分约 170 万原始行,但速度太慢/一小时后未完成
    【解决方案3】:

    这是另一种方法,以防您想用分隔符分割字符串。

    import pyspark.sql.functions as f
    
    df = spark.createDataFrame([("1:a:2001",),("2:b:2002",),("3:c:2003",)],["value"])
    df.show()
    +--------+
    |   value|
    +--------+
    |1:a:2001|
    |2:b:2002|
    |3:c:2003|
    +--------+
    
    df_split = df.select(f.split(df.value,":")).rdd.flatMap(
                  lambda x: x).toDF(schema=["col1","col2","col3"])
    
    df_split.show()
    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1|   a|2001|
    |   2|   b|2002|
    |   3|   c|2003|
    +----+----+----+
    

    我不认为这种到 RDD 的来回转换会减慢你的速度...... 也不必担心最后的模式规范:它是可选的,您可以避免将解决方案推广到列大小未知的数据。

    【讨论】:

    • 如何在 scala 中做到这一点?我被 flatMap lambda 函数卡住了
    • 注意模式是作为正则表达式给出的,因此您需要使用 \ 来表示特殊字符
    • 如果不想在表达式中引用df,可以将列名传递给split,即df.select(f.split("value",":"))...
    • @moshebeeri 你救了我!
    【解决方案4】:

    我理解你的痛苦。使用 split() 可以工作,但也可能导致中断。

    让我们拿你的 df 做些小改动:

    df = spark.createDataFrame([('1:"a:3":2001',),('2:"b":2002',),('3:"c":2003',)],["value"]) 
    
    df.show()
    
    +------------+
    |       value|
    +------------+
    |1:"a:3":2001|
    |  2:"b":2002|
    |  3:"c":2003|
    +------------+
    

    如果您尝试将 split() 应用于上述内容:

    df_split = df.select(split(df.value,":")).rdd.flatMap(
                  lambda x: x).toDF(schema=["col1","col2","col3"]).show()
    

    你会得到

    IllegalStateException:输入行没有架构所需的预期值数量。需要 4 个字段,提供 3 个值。

    那么,有没有更优雅的方法来解决这个问题?我很高兴有人向我指出了这一点。 pyspark.sql.functions.from_csv() 是你的朋友。

    以我上面的例子df:

    from pyspark.sql.functions import from_csv
    
    # Define a column schema to apply with from_csv()
    col_schema = ["col1 INTEGER","col2 STRING","col3 INTEGER"]
    schema_str = ",".join(col_schema)
    
    # define the separator because it isn't a ','
    options = {'sep': ":"}
    
    # create a df from the value column using schema and options
    df_csv = df.select(from_csv(df.value, schema_str, options).alias("value_parsed"))
    df_csv.show()
    
    +--------------+
    |  value_parsed|
    +--------------+
    |[1, a:3, 2001]|
    |  [2, b, 2002]|
    |  [3, c, 2003]|
    +--------------+
    

    然后我们可以轻松地将 df 展平以将值放在列中:

    df2 = df_csv.select("value_parsed.*").toDF("col1","col2","col3")
    df2.show()
    
    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1| a:3|2001|
    |   2|   b|2002|
    |   3|   c|2003|
    +----+----+----+
    

    没有休息。数据正确解析。生活很好。喝杯啤酒。

    【讨论】:

      猜你喜欢
      • 2020-11-10
      • 2022-12-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-27
      • 1970-01-01
      相关资源
      最近更新 更多