【问题标题】:access fields of an array within pyspark dataframe访问pyspark数据框中数组的字段
【发布时间】:2018-07-27 19:12:47
【问题描述】:

我正在开发基于一组 ORC 文件的 spark 数据帧的 sql 查询。程序是这样的:

from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("test").getOrCreate()
sdf = spark_session.read.orc("../data/")
sdf.createOrReplaceTempView("test")

现在我有一个名为“test”的表。如果我这样做:

spark_session.sql("select count(*) from test")

那么结果会很好。但我需要在查询中获取更多列,包括数组中的一些字段。

In [8]: sdf.take(1)[0]["person"]
Out[8]:
[Row(name='name', value='tom'),
 Row(name='age', value='20'),
 Row(name='gender', value='m')]

我尝试过类似的方法:

spark_session.sql("select person.age, count(*) from test group by person.age")

但这不起作用。我的问题是:如何访问“person”数组中的字段?

谢谢!

编辑:

sdf.printSchema() 的结果

In [3]: sdf.printSchema()
root
 |-- person: integer (nullable = true)
 |-- customtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)

错误信息:

AnalysisException: 'No such struct field age in name, value; line 16 pos 8'

【问题讨论】:

  • df=spark.sql(select * from test);func=udf(lambda x: x['age'],IntegerType()); df.withColumns('age',func(df.person)).groupby('age').count().show()

标签: pyspark pyspark-sql orc


【解决方案1】:

我不知道如何仅使用 PySpark-SQL 来做到这一点,但这里有一种使用 PySpark DataFrames 的方法。

基本上,我们可以使用create_map() 函数将结构列转换为MapType()。然后我们可以使用字符串索引直接访问字段。

考虑以下示例:

定义架构

schema = StructType([
        StructField('person', IntegerType()),
        StructField(
            'customtags',
            ArrayType(
                StructType(
                    [
                        StructField('name', StringType()),
                        StructField('value', StringType())
                    ]
                )
            )
        )
    ]
)

创建示例数据框

data = [
    (
        1, 
        [
            {'name': 'name', 'value': 'tom'},
            {'name': 'age', 'value': '20'},
            {'name': 'gender', 'value': 'm'}
        ]
    ),
    (
        2,
        [
            {'name': 'name', 'value': 'jerry'},
            {'name': 'age', 'value': '20'},
            {'name': 'gender', 'value': 'm'}
        ]
    ),
    (
        3,
        [
            {'name': 'name', 'value': 'ann'},
            {'name': 'age', 'value': '20'},
            {'name': 'gender', 'value': 'f'}
        ]
    )
]
df = sqlCtx.createDataFrame(data, schema)
df.show(truncate=False)
#+------+------------------------------------+
#|person|customtags                          |
#+------+------------------------------------+
#|1     |[[name,tom], [age,20], [gender,m]]  |
#|2     |[[name,jerry], [age,20], [gender,m]]|
#|3     |[[name,ann], [age,20], [gender,f]]  |
#+------+------------------------------------+

将结构列转换为地图

from operator import add
import pyspark.sql.functions as f

df = df.withColumn(
        'customtags',
        f.create_map(
            *reduce(
                add, 
                [
                    [f.col('customtags')['name'][i],
                     f.col('customtags')['value'][i]] for i in range(3)
                ]
            )
        )
    )\
    .select('person', 'customtags')

df.show(truncate=False)
#+------+------------------------------------------+
#|person|customtags                                |
#+------+------------------------------------------+
#|1     |Map(name -> tom, age -> 20, gender -> m)  |
#|2     |Map(name -> jerry, age -> 20, gender -> m)|
#|3     |Map(name -> ann, age -> 20, gender -> f)  |
#+------+------------------------------------------+

这里的问题是您必须先验地知道ArrayType()(在本例中为 3)的长度,因为我不知道动态循环它的方法。这也假设数组的所有行都具有相同的长度。

我必须在这里使用reduce(add, ...),因为create_map() 需要(key, value) 形式的元素对。

按地图列中的字段分组

df.groupBy((f.col('customtags')['name']).alias('name')).count().show()
#+-----+-----+
#| name|count|
#+-----+-----+
#|  ann|    1|
#|jerry|    1|
#|  tom|    1|
#+-----+-----+

df.groupBy((f.col('customtags')['gender']).alias('gender')).count().show()
#+------+-----+
#|gender|count|
#+------+-----+
#|     m|    2|
#|     f|    1|
#+------+-----+

【讨论】:

  • 现在有什么关于动态获取数组长度的建议吗?
猜你喜欢
  • 2017-07-03
  • 1970-01-01
  • 2017-10-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多