【发布时间】:2020-08-04 02:24:03
【问题描述】:
我有一个数据预处理管道,用于清理数万条推文中的数据。我想分阶段保存我的数据框,以便我可以从管道的后期阶段加载这些“保存点”。我已经读过,以镶木地板格式保存数据帧是最“有效”的编写方法,因为它快速、可扩展等,这对我来说是理想的,因为我正试图为这个项目牢记可扩展性。
但是,我遇到了一个问题,我似乎无法将包含结构的字段保存到文件中。尝试输出我的数据帧时收到 JSON 错误 json.decoder.JSONDecodeError: Expecting ',' delimiter: ...(更多详细信息如下)。
我的数据框目前采用以下格式:
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
| id| timestamp| tweet_text| tweet_hashtags|tweet_media| tweet_urls| topic| categories|priority|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|266269932671606786|1536170446|Eight dead in the...| []| []| []|guatemalaEarthqua...|[Report-EmergingT...| Low|
|266804609954234369|1536256997|Guys, lets help ... |[[Guatemala, [72,...| []|[[http:url... |guatemalaEarthqua...|[CallToAction-Don...| Medium|
|266250638852243457|1536169939|My heart goes out...|[[Guatemala, [31,...| []| []|guatemalaEarthqua...|[Report-EmergingT...| Medium|
|266381928989589505|1536251780|Strong earthquake...| []| []|[[http:url... |guatemalaEarthqua...|[Report-EmergingT...| Medium|
|266223346520297472|1536167235|Magnitude 7.5 Qua...| []| []| []|guatemalaEarthqua...|[Report-EmergingT...| Medium|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
only showing top 5 rows
为清楚起见,使用以下架构:
root
|-- id: string (nullable = true)
|-- timestamp: long (nullable = true)
|-- tweet_text: string (nullable = true)
|-- tweet_hashtags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = false)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- tweet_media: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id_str: string (nullable = true)
| | |-- type: string (nullable = false)
| | |-- url: string (nullable = true)
| | |-- media_url: string (nullable = true)
| | |-- media_https: string (nullable = true)
| | |-- display_url: string (nullable = true)
| | |-- expanded_url: string (nullable = true)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- tweet_urls: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- url: string (nullable = false)
| | |-- display_url: string (nullable = true)
| | |-- expanded_url: string (nullable = true)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- topic: string (nullable = true)
|-- categories: array (nullable = true)
| |-- element: string (containsNull = true)
|-- priority: string (nullable = true)
我正在尝试使用以下行以镶木地板格式保存此数据框:
df.write.mode('overwrite').save(
path=f'{DATA_DIR}/interim/feature_select.parquet',
format='parquet')
还有df.write.parquet(f'{DATA_DIR}/interim/feature_select.parquet', mode='overwrite')。
但是,在尝试保存这些文件时,我收到了错误 json.decoder.JSONDecodeError: Expecting ',' delimiter: ...:
File "features.py", line 207, in <lambda>
entities_udf = F.udf(lambda s: _convert_str_to_arr(s), v)
File "features.py", line 194, in _convert_str_to_arr
arr = [json.loads(x) for x in arr]
File "features.py", line 194, in <listcomp>
arr = [json.loads(x) for x in arr]
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/__init__.py", line 348, in loads
return _default_decoder.decode(s)
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 353, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting ',' delimiter: line 1 column 93 (char 92)
错误代码中所述的行还引用了我之前对多个列进行的UDF 转换(列tweet_*)。当我删除作家时,这工作正常。
我在为 parquet 文件指定分隔符时找不到太多信息,这可能吗?还是我必须序列化任何包含逗号的数据?或者我什至必须采用我已解析和更改的 Spark 结构,并将它们转换回 JSON 以保存文件?
【问题讨论】:
-
您解决问题了吗?它与我假设的 json 有关吗?
标签: json apache-spark pyspark user-defined-functions