【问题标题】:Create Spark DataFrame from Pandas DataFrames inside RDD从 RDD 中的 Pandas DataFrames 创建 Spark DataFrame
【发布时间】:2019-10-20 12:54:35
【问题描述】:

我正在尝试将每个工作节点上的 Pandas DataFrame(每个元素都是 Pandas DataFrame 的 RDD)转换为跨所有工作节点的 Spark DataFrame。

例子:

def read_file_and_process_with_pandas(filename):
    data = pd.read(filename)
    """
    some additional operations using pandas functionality
    here the data is a pandas dataframe, and I am using some datetime
    indexing which isn't available for spark dataframes
    """
    return data

filelist = ['file1.csv','file2.csv','file3.csv']
rdd = sc.parallelize(filelist)
rdd = rdd.map(read_file_and_process_with_pandas)

之前的操作有效,所以我有一个 Pandas DataFrames 的 RDD。完成 Pandas 处理后,如何将其转换为 Spark DataFrame?

我尝试做rdd = rdd.map(spark.createDataFrame),但是当我做rdd.take(5) 之类的事情时,我收到以下错误:

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o103.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

有没有办法将每个工作节点中的 Pandas DataFrames 转换为分布式 DataFrame?

【问题讨论】:

  • 您想要一个 rdd 的 spark DataFrames 吗?我不认为这是可能的。你为什么要这样做?最终目标是什么?
  • 你的 spark 是哪个版本的?
  • 我需要使用 Panda 的日期时间索引,这对于 spark RDD 或数据帧是不可能的

标签: pandas apache-spark pyspark


【解决方案1】:

看到这个问题:https://stackoverflow.com/a/51231046/7964197

我不得不处理同样的问题,这似乎很常见(使用 pandas 读取许多文件,例如 excel/pickle/任何其他非 spark 格式,并将生成的 RDD 转换为 spark 数据帧)

提供的代码在SparkSession 上添加了一个新方法,该方法使用pyarrowpd.DataFrame 对象转换为箭头记录批次,然后直接转换为pyspark.DataFrame 对象

spark_df = spark.createFromPandasDataframesRDD(prdd) # prdd is an RDD of pd.DataFrame objects

对于大量数据,这比转换为 Row() 对象的 RDD 快几个数量级。

【讨论】:

  • 感谢您的回复!我认为当您需要使用 pandas 的日期时间索引功能时,这是一个非常常见的问题。如果 spark 可以本地处理转换,那就太好了。
【解决方案2】:

Pandas 数据帧不能直接转换为 rdd。 您可以从 Pandas 创建 Spark DataFrame

spark_df = context.createDataFrame(pandas_df)

参考:Introducing DataFrames in Apache Spark for Large Scale Data Science

【讨论】:

  • 我已经尝试过了,你可以在我的回答中看到。这不是我真正想要的,我正在尝试将每个工作节点上的 RDD 中的 pandas 数据帧映射到所有节点的 spark 数据帧。
  • 好吧~那样的话,我觉得你最好写一些UDF,然后把panda放到你的UDF中。
猜你喜欢
  • 2019-07-08
  • 1970-01-01
  • 1970-01-01
  • 2021-12-13
  • 1970-01-01
  • 1970-01-01
  • 2016-06-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多