【问题标题】:How to lower the case of element names in ArrayType or MapType columns in PySpark?如何降低 PySpark 中 ArrayType 或 MapType 列中元素名称的大小写?
【发布时间】:2021-08-31 04:41:59
【问题描述】:

我正在尝试降低 PySpark Dataframe 架构的所有列名称的大小写,包括复杂类型列的元素名称。

例子:

original_df
 |-- USER_ID: long (nullable = true)
 |-- COMPLEX_COL_ARRAY: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- KEY: timestamp (nullable = true)
 |    |    |-- VALUE: integer (nullable = true)
target_df
 |-- user_id: long (nullable = true)
 |-- complex_col_array: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: timestamp (nullable = true)
 |    |    |-- value: integer (nullable = true)

但是,我只能使用以下脚本降低列名的大小写:

from pyspark.sql.types import StructField
schema = df.schema
schema.fields = list(map(lambda field: StructField(field.name.lower(), field.dataType), schema.fields))

我知道我可以使用以下语法访问嵌套元素的字段名称:

for f in schema.fields:
    if hasattr(f.dataType, 'elementType') and hasattr(f.dataType.elementType, 'fieldNames'):
        print(schema.f.dataType.elementType.fieldNames())

但是如何修改这些字段名的大小写呢?

感谢您的帮助!

【问题讨论】:

  • 我想降低所有 Parquet 模式的大小写,因为我在 Hive、Parquet、JSON 和 Spark 之间遇到了区分大小写的问题。

标签: python pyspark lowercase


【解决方案1】:

受此问题启发,建议回答我自己的问题:Rename nested field in spark dataframe

from pyspark.sql.types import StructField

# Read parquet file
path = "/path/to/data"
df = spark.read.parquet(path)
schema = df.schema

# Lower the case of all fields that are not nested
schema.fields = list(map(lambda field: StructField(field.name.lower(), field.dataType), schema.fields))

for f in schema.fields:
    # if field is nested and has named elements, lower the case of all element names
    if hasattr(f.dataType, 'elementType') and hasattr(f.dataType.elementType, 'fieldNames'):
        for e in f.dataType.elementType.fieldNames():
            schema[f.name].dataType.elementType[e].name =  schema[f.name].dataType.elementType[e].name.lower()
            ind = schema[f.name].dataType.elementType.names.index(e)
            schema[f.name].dataType.elementType.names[ind] = e.lower()

# Recreate dataframe with lowercase schema
df_lowercase = spark.createDataFrame(df.rdd, schema)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2010-11-07
    • 1970-01-01
    • 2018-12-28
    • 2018-07-18
    • 1970-01-01
    • 2019-03-29
    • 2019-07-29
    相关资源
    最近更新 更多