【问题标题】:PySpark - Convert python datastructure to RDD on executorPySpark - 在执行器中将 python 数据结构转换为 RDD
【发布时间】:2026-01-11 05:50:01
【问题描述】:

我正在使用 Spark 并行化一些现有代码,这些代码执行一些数据提取并返回一个 pandas 数据帧。我想将这些 pandas 数据帧转换为一个或多个 Spark 数据帧。

铌。现有代码非常复杂(涉及调用本机库等),因此不能将其直接移植到 Spark 代码中。

这是代码的简化示例:

import pandas as pd

def extract_df(s):
    # Lots of existing code that returns a large pandas dataframe
    # ...
    return pd.DataFrame({'x': s, 'y': [1, 2, 3], 'z': [4, 5, 6]})

sRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])
dfsRDD = sRDD.map(lambda s: extract_df(s))

我知道我可以通过在驱动程序上收集将 datesRDD 转换为 Spark 数据帧。

spark.createDataFrame(pd.concat(rdd.collect(), ignore_index=True)).show()

但这当然要求我可以将整个 Pandas 数据帧集合保存在内存中,而我做不到。

目前,我正在将 Pandas 数据帧写入 S3 上的 json,然后使用 Spark 读取,但这会使用 lot 的存储空间。

有什么方法可以告诉 Spark 在执行器本身上转换为 DataFrame/RDD?还是我错过了另一种方法?

【问题讨论】:

    标签: apache-spark pyspark spark-dataframe


    【解决方案1】:

    很好,flatMap 来救命了!

    import pandas as pd
    
    def extract_df(s):
        # Lots of existing code that returns a **huge** pandas dataframe
        # ...
        df =  pd.DataFrame({'x': s, 'y': [1, 2, 3], 'z': [4, 5, 6]})
        return df.values.tolist()
    
    datesRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])
    
    dfsRDD = datesRDD.flatMap(lambda s: extract_df(s))
    
    spark.createDataFrame(dfsRDD, schema=['x', 'y', 'z']).show()
    
    +---+---+---+
    |  x|  y|  z|
    +---+---+---+
    |  A|  1|  4|
    |  A|  2|  5|
    |  A|  3|  6|
    |  B|  1|  4|
    |  B|  2|  5|
    |  B|  3|  6|
    |  C|  1|  4|
    |  C|  2|  5|
    |  C|  3|  6|
    +---+---+---+
    

    【讨论】:

      最近更新 更多