【发布时间】:2017-05-07 17:33:29
【问题描述】:
我正在使用 pyspark,其架构与本文末尾显示的架构相称(注意嵌套列表、无序字段),最初是从 Parquet 作为 DataFrame 导入的。从根本上说,我遇到的问题是无法将这些数据作为 RDD 处理,然后再转换回 DataFrame。 (我已经查看了几篇相关的帖子,但我仍然无法确定我哪里出错了。)
简单地说,以下代码可以正常工作(正如人们所期望的那样):
schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
tripDFNew = sqlContext.createDataFrame(tripRDD, schema)
tripDFNew.take(1)
当我需要映射 RDD 时(例如添加字段的情况),事情就不起作用了。
schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
def trivial_map(row):
rowDict = row.asDict()
return pyspark.Row(**rowDict)
tripRDDNew = tripRDD.map(lambda row: trivial_map(row))
tripDFNew = sqlContext.createDataFrame(tripRDDNew, schema)
tripDFNew.take(1)
上面的代码给出了以下异常,其中 XXX 是整数的替代,它会随着运行而变化(例如,我见过 1、16、23 等):
File "/opt/cloudera/parcels/CDH-5.8.3-
1.cdh5.8.3.p1967.2057/lib/spark/python/pyspark/sql/types.py", line 546, in
toInternal
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple XXX with StructType`
鉴于此信息,第二个代码块中是否存在明显错误? (我注意到tripRDD 属于rdd.RDD 类,而tripRDDNew 属于rdd.PipelinedRDD 类,但我认为这应该不是问题。)(我还注意到tripRDD 的架构不是按字段名排序的,而tripRDDNew 的架构按字段名称排序。同样,我不明白为什么会出现问题。)
架构:
root
|-- foo: struct (nullable = true)
| |-- bar_1: integer (nullable = true)
| |-- bar_2: integer (nullable = true)
| |-- bar_3: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- baz_1: integer (nullable = true)
| | | |-- baz_2: string (nullable = true)
| | | |-- baz_3: double (nullable = true)
| |-- bar_4: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- baz_1: integer (nullable = true)
| | | |-- baz_2: string (nullable = true)
| | | |-- baz_3: double (nullable = true)
|-- qux: integer (nullable = true)
|-- corge: integer (nullable = true)
|-- uier: integer (nullable = true)`
【问题讨论】: