【问题标题】:How to generate, then reduce, a massive set of DataFrames from each row of one DataFrame in PySpark?如何从 PySpark 中一个 DataFrame 的每一行生成然后减少大量 DataFrame?
【发布时间】:2021-12-11 23:33:56
【问题描述】:

很遗憾,我无法分享我的实际代码或数据,因为它是专有的,但如果读者从文本中不清楚问题,我可以生成 MWE。

我正在处理一个包含约 5000 万行的数据框,每行都包含一个大型 XML 文档。从每个 XML 文档中,我提取了与标签之间的出现次数和层次关系相关的统计信息列表(没有像未记录的 XML 格式那样让一天变得更加美好)。我可以在数据帧中表达这些统计数据,并且可以使用 GROUP BY/SUM 和 DISTINCT 等标准操作将这些数据帧组合到多个文档中。目标是提取所有 5000 万份文档的统计数据,并将它们表达在单个数据框中。

问题是我不知道如何从 Spark 中的一个数据帧的每一行有效地生成 5000 万个数据帧,或者如何告诉 Spark 使用二元运算符将 5000 万个数据帧的列表减少为一个数据帧。是否有执行这些操作的标准函数?

到目前为止,我发现的唯一解决方法非常低效(将数据存储为字符串,解析它,进行计算,然后将其转换回字符串)。使用此方法需要数周才能完成,因此不实用。

【问题讨论】:

  • 你好佐尔戈斯!正如您暗示的那样,请提供 MWE,以便其他人可以使用解决方案更新代码。

标签: sql dataframe apache-spark pyspark


【解决方案1】:

每行的每个 XML 响应的提取和统计数据可以存储在行本身的附加列中。这样,spark 应该能够在其多个执行程序中执行流程,从而提高性能。 这是一个伪代码。

from pyspark.sql.types import StructType, StructField, IntegerType, 
StringType, DateType, FloatType, ArrayType

def extract_metrics_from_xml(row):
    j = row['xmlResponse'] # assuming your xml column name is xmlResponse
    # perform your xml extractions and computations for the xmlResponse in python
    ...
    load_date = ...
    stats_data1 = ...
    
    return Row(load_date, stats_data1, stats_data2, stats_group)

  
schema = schema = StructType([StructField('load_date', DateType()),
                     StructField('stats_data1', FloatType()),
                     StructField('stats_data2', ArrayType(IntegerType())),
                     StructField('stats_group', StringType())
                     ])
df_with_xml_stats = original_df.rdd\
                            .map(extract_metrics_from_xml)\
                            .toDF(schema=schema, sampleRatio=1)\
                            .cache()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-26
    • 2016-07-20
    • 1970-01-01
    • 1970-01-01
    • 2020-04-14
    • 1970-01-01
    • 2020-02-11
    • 2021-09-22
    相关资源
    最近更新 更多