【发布时间】:2019-10-20 12:54:35
【问题描述】:
我正在尝试将每个工作节点上的 Pandas DataFrame(每个元素都是 Pandas DataFrame 的 RDD)转换为跨所有工作节点的 Spark DataFrame。
例子:
def read_file_and_process_with_pandas(filename):
data = pd.read(filename)
"""
some additional operations using pandas functionality
here the data is a pandas dataframe, and I am using some datetime
indexing which isn't available for spark dataframes
"""
return data
filelist = ['file1.csv','file2.csv','file3.csv']
rdd = sc.parallelize(filelist)
rdd = rdd.map(read_file_and_process_with_pandas)
之前的操作有效,所以我有一个 Pandas DataFrames 的 RDD。完成 Pandas 处理后,如何将其转换为 Spark DataFrame?
我尝试做rdd = rdd.map(spark.createDataFrame),但是当我做rdd.take(5) 之类的事情时,我收到以下错误:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o103.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
有没有办法将每个工作节点中的 Pandas DataFrames 转换为分布式 DataFrame?
【问题讨论】:
-
您想要一个
rdd的 spark DataFrames 吗?我不认为这是可能的。你为什么要这样做?最终目标是什么? -
你的 spark 是哪个版本的?
-
我需要使用 Panda 的日期时间索引,这对于 spark RDD 或数据帧是不可能的
标签: pandas apache-spark pyspark