【问题标题】:Pyspark Parquet with struct column带有结构列的 Pyspark Parquet
【发布时间】:2019-11-29 18:44:43
【问题描述】:

我想将嵌套对象(“struct”)添加到 pySpark 数据帧并将其写入 parquet。我想重新创建以下内容(目前使用 Scala spark + udf (How to add a new Struct column to a DataFrame) 准备):

 |-- _level1: struct (nullable = true)
 |    |-- level2a: struct (nullable = true)
 |    |    |-- fielda: string (nullable = true)
 |    |    |-- fieldb: string (nullable = true)
 |    |    |-- fieldc: string (nullable = true)
 |    |    |-- fieldd: string (nullable = true)
 |    |    |-- fielde: string (nullable = true)
 |    |    |-- fieldf: string (nullable = true)
 |    |-- level2b: struct (nullable = true)
 |    |    |-- fielda: string (nullable = true)
 |    |    |-- fieldb: string (nullable = true)
 |    |    |-- fieldc: string (nullable = true)

最好的方法是什么?

【问题讨论】:

  • 是的,当然,我尝试了各种方法。从那以后,我想出了一些可行的方法并在下面发布。

标签: python apache-spark pyspark


【解决方案1】:

我已经想办法做我想做的事了。这个想法是为嵌套列(结构)创建架构,如下所示:

from pyspark.sql.functions import lit, udf
from pyspark.sql.types import StringType, StructField, StructType

schema = StructType([
            StructField('level2a',
                        StructType(
                            [
                                StructField('fielda', StringType(), nullable=False),
                                StructField('fieldb', StringType(), nullable=False),
                                StructField('fieldc', StringType(), nullable=False),
                                StructField('fieldd', StringType(), nullable=False),
                                StructField('fielde', StringType(), nullable=False),
                                StructField('fieldf', StringType(), nullable=False)
                            ])
                        ),
            StructField('level2b',
                        StructType(
                            [
                                StructField('fielda', StringType(), nullable=False),
                                StructField('fieldb', StringType(), nullable=False),
                                StructField('fieldc', StringType(), nullable=False)
                            ])
                        )
        ])

然后可以将其与 udf(将上述模式作为参数)结合使用以获得所需的结果。


def make_meta(fielda, fieldb, fieldc, fieldd, fielde, fieldf, fieldalvl2, fieldblvl2, fieldclvl2):
    return [
        [fielda, fieldb, fieldc, fieldd, fielde, fieldf],
        [fieldalvl2, fieldblvl2, fieldclvl2]
    ]

test_udf = udf(lambda fielda,
               fieldb,
               fieldc,
               fieldd,
               fieldf,
               fielde,
               fieldalvl2, fieldblvl2, fieldclvl2:
               make_meta(fielda,
               fieldb,
               fieldc,
               fieldd,
               fieldf,
               fielde, fieldalvl2, fieldblvl2, fieldclvl2),
               schema)

df = spark.range(0, 5)
df.withColumn("test", test_udf(lit("a"), lit("b"), lit("c"),lit("d"),lit("e"),lit("f"),lit("a"),lit("b"),lit("c"))).printSchema()

打印以下内容:

root
 |-- id: long (nullable = false)
 |-- test: struct (nullable = true)
 |    |-- level2a: struct (nullable = true)
 |    |    |-- fielda: string (nullable = false)
 |    |    |-- fieldb: string (nullable = false)
 |    |    |-- fieldc: string (nullable = false)
 |    |    |-- fieldd: string (nullable = false)
 |    |    |-- fielde: string (nullable = false)
 |    |    |-- fieldf: string (nullable = false)
 |    |-- level2b: struct (nullable = true)
 |    |    |-- fielda: string (nullable = false)
 |    |    |-- fieldb: string (nullable = false)
 |    |    |-- fieldc: string (nullable = false)

在 scala 中,可以从 udf 返回案例类的实例,这是我在 python 中尝试做的(即返回一个对象)

【讨论】:

    【解决方案2】:

    如果你想嵌套列,你可以使用struct function。这将比使用用户定义的函数 (udf) 更有效,因为这些操作直接在 Java 虚拟机上进行。

    这是一个例子:

    In [1]: from pyspark.sql.functions import struct, col
       ...: 
       ...: df = spark.createDataFrame([(list("abcdefABC"))],
       ...:                            schema=list("abcdefghi")
       ...:                            )
       ...: df2 = df.select(
       ...:     struct(
       ...:         struct(*(col(_).alias("field%s" % _) for _ in "abcdef")).alias("level2a"),
       ...:         struct(*(col(_).alias("field%s" % (chr(ord(_) - 6))) for _ in ("ghi"))).alias("level2b")
       ...:     ).alias("_level1")
       ...: )
       ...: 
       ...: df2.printSchema()
       ...: 
       ...: 
    root
     |-- _level1: struct (nullable = false)
     |    |-- level2a: struct (nullable = false)
     |    |    |-- fielda: string (nullable = true)
     |    |    |-- fieldb: string (nullable = true)
     |    |    |-- fieldc: string (nullable = true)
     |    |    |-- fieldd: string (nullable = true)
     |    |    |-- fielde: string (nullable = true)
     |    |    |-- fieldf: string (nullable = true)
     |    |-- level2b: struct (nullable = false)
     |    |    |-- fielda: string (nullable = true)
     |    |    |-- fieldb: string (nullable = true)
     |    |    |-- fieldc: string (nullable = true)
    
    

    这里进行了一些字符串数学运算(chr 获取某个索引处的 Unicode 符号,ord 获取符号的代码点)以防止重复(struct(col("a").alias("fielda"), col("b").alias("fieldb"), …)),但主要信息是:使用struct 从其他列创建一个新的结构化列。

    【讨论】:

    • 谢谢 - 将执行时间(每个作业)从 12 秒减少到 6 秒。
    猜你喜欢
    • 2020-08-04
    • 2016-09-21
    • 2020-08-27
    • 1970-01-01
    • 2021-12-16
    • 2021-10-18
    • 2022-01-09
    • 2016-11-09
    • 1970-01-01
    相关资源
    最近更新 更多