【问题标题】:PySpark: Unable to write structs (DF -> Parquet)PySpark:无法编写结构(DF -> Parquet)
【发布时间】:2020-08-04 02:24:03
【问题描述】:

我有一个数据预处理管道,用于清理数万条推文中的数据。我想分阶段保存我的数据框,以便我可以从管道的后期阶段加载这些“保存点”。我已经读过,以镶木地板格式保存数据帧是最“有效”的编写方法,因为它快速、可扩展等,这对我来说是理想的,因为我正试图为这个项目牢记可扩展性。

但是,我遇到了一个问题,我似乎无法将包含结构的字段保存到文件中。尝试输出我的数据帧时收到 JSON 错误 json.decoder.JSONDecodeError: Expecting ',' delimiter: ...(更多详细信息如下)。

我的数据框目前采用以下格式:

+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|                id| timestamp|          tweet_text|      tweet_hashtags|tweet_media|          tweet_urls|               topic|          categories|priority|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|266269932671606786|1536170446|Eight dead in the...|                  []|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|     Low|
|266804609954234369|1536256997|Guys, lets help ... |[[Guatemala, [72,...|         []|[[http:url...       |guatemalaEarthqua...|[CallToAction-Don...|  Medium|
|266250638852243457|1536169939|My heart goes out...|[[Guatemala, [31,...|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|  Medium|
|266381928989589505|1536251780|Strong earthquake...|                  []|         []|[[http:url...       |guatemalaEarthqua...|[Report-EmergingT...|  Medium|
|266223346520297472|1536167235|Magnitude 7.5 Qua...|                  []|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|  Medium|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
only showing top 5 rows

为清楚起见,使用以下架构:

root
 |-- id: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- tweet_hashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = false)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- tweet_media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- type: string (nullable = false)
 |    |    |-- url: string (nullable = true)
 |    |    |-- media_url: string (nullable = true)
 |    |    |-- media_https: string (nullable = true)
 |    |    |-- display_url: string (nullable = true)
 |    |    |-- expanded_url: string (nullable = true)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- tweet_urls: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- url: string (nullable = false)
 |    |    |-- display_url: string (nullable = true)
 |    |    |-- expanded_url: string (nullable = true)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- topic: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- priority: string (nullable = true)

我正在尝试使用以下行以镶木地板格式保存此数据框:

df.write.mode('overwrite').save(
    path=f'{DATA_DIR}/interim/feature_select.parquet',
    format='parquet')

还有df.write.parquet(f'{DATA_DIR}/interim/feature_select.parquet', mode='overwrite')

但是,在尝试保存这些文件时,我收到了错误 json.decoder.JSONDecodeError: Expecting ',' delimiter: ...

  File "features.py", line 207, in <lambda>
    entities_udf = F.udf(lambda s: _convert_str_to_arr(s), v)
  File "features.py", line 194, in _convert_str_to_arr
    arr = [json.loads(x) for x in arr]
  File "features.py", line 194, in <listcomp>
    arr = [json.loads(x) for x in arr]
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting ',' delimiter: line 1 column 93 (char 92)

错误代码中所述的行还引用了我之前对多个列进行的UDF 转换(列tweet_*)。当我删除作家时,这工作正常。

我在为 parquet 文件指定分隔符时找不到太多信息,这可能吗?还是我必须序列化任何包含逗号的数据?或者我什至必须采用我已解析和更改的 Spark 结构,并将它们转换回 JSON 以保存文件?

【问题讨论】:

  • 您解决问题了吗?它与我假设的 json 有关吗?

标签: json apache-spark pyspark user-defined-functions


【解决方案1】:

这个错误其实和 parquet 一点关系都没有。在采用action 之前,不会应用数据帧上的转换(在这种情况下,保存到镶木地板)。所以直到此时错误才会发生。

从错误中,我们可以看出实际的问题是行:

arr = [json.loads(x) for x in arr]

发生在UDF 转换中。

当 JSON 出现问题时,会出现 json.decoder.JSONDecodeError 错误。两个常见问题是它不是有效的 JSON 或存在引用问题,请参阅 here。所以,

  1. 确认列包含有效的 JSON。
  2. 尝试将\" 替换为\\",可以使用x.replace("\\", r"\\")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-08
    • 2022-11-30
    • 1970-01-01
    • 2017-12-03
    • 2022-01-12
    相关资源
    最近更新 更多