【问题标题】:How to save a huge pandas dataframe to hdfs?如何将巨大的熊猫数据框保存到 hdfs?
【发布时间】:2018-05-03 17:31:53
【问题描述】:

我正在使用 pandas 和 spark 数据框。数据帧总是非常大(> 20 GB),标准的火花函数不足以满足这些大小。目前我正在将我的熊猫数据框转换为像这样的火花数据框:

dataframe = spark.createDataFrame(pandas_dataframe)  

我进行这种转换是因为使用 spark 将数据帧写入 hdfs 非常容易:

dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")

但是对于大于 2 GB 的数据帧,转换失败。 如果我将 spark 数据框转换为 pandas,我可以使用 pyarrow:

// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)

这是从 spark 到 pandas 的快速转换,它也适用于大于 2 GB 的数据帧。我还没有找到相反的方法。意味着有一个熊猫数据框,我在 pyarrow 的帮助下将其转换为火花。问题是我真的找不到如何将 pandas 数据帧写入 hdfs。

我的熊猫版本:0.19.0

【问题讨论】:

  • 您遇到什么错误?您确定应用程序在写入失败或之前(在数据帧转换期间)失败了吗?
  • 由于 java 堆空间有限并且 createDataFrame 方法正在 java 堆上构建字节数组,因此失败并出现内存不足异常。为了解决这个问题,我们需要 pyarrow 解决方案。如前所述,它已经完美地适用于大熊猫的火花。但我也需要 pandas 来激发火花,因为我找不到将 pandas 直接保存到 hdfs 或 hive 的方法。
  • 只是好奇——在这种大小下,为什么不直接将数据写入数据库呢?以 Postgres 为例,如果您仍想编写 Python 或 C 代码在数据库中对其进行操作。
  • 一个 hack 可能是从大的创建 N 个 pandas 数据帧(每个小于 2 GB)(水平分区)并创建 N 个不同的 spark 数据帧,然后合并(联合)它们以创建最后一个写入 HDFS。我假设你的主机很强大,但你也有一个运行 Spark 的集群。

标签: python pandas apache-spark pyarrow apache-arrow


【解决方案1】:

意味着有一个熊猫数据框,我在 pyarrow 的帮助下将其转换为火花。

pyarrow.Table.fromPandas 是您要查找的函数:

Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)

Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa

pdf = ...  # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf)  # type: pyarrow.lib.Table

结果可以直接写入 Parquet / HDFS,无需通过 Spark 传递数据:

import pyarrow.parquet as pq

fs  = pa.hdfs.connect()

with fs.open(path, "wb") as fw
    pq.write_table(adf, fw)

另请参阅

火花笔记

此外,自 Spark 2.3(当前主控)以来,createDataFrame (SPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame) 直接支持 Arrow。它uses SparkContext.defaultParallelism to compute number of chunks 这样您就可以轻松控制各个批次的大小。

最后defaultParallelism 可用于控制使用标准_convert_from_pandas 生成的分区数量,从而有效地将切片的大小减小到更易于管理的程度。

不幸的是,这些不太可能解决您的current memory problems。两者都依赖于parallelize,因此将所有数据存储在驱动节点的内存中。切换到箭头或调整配置只能加快进程或解决块大小限制。

实际上,只要您使用本地 Pandas DataFrame 作为输入,我看不出有任何理由在这里切换到 Spark。这种情况下最严重的瓶颈是驱动程序的网络 I/O,而分发数据无法解决这个问题。

【讨论】:

    【解决方案2】:

    来自https://issues.apache.org/jira/browse/SPARK-6235

    支持并行化大于 2GB 的 R data.frame

    已解决。

    来自https://pandas.pydata.org/pandas-docs/stable/r_interface.html

    将 DataFrame 转换为 R 对象

    您可以将 pandas 数据帧转换为 R 数据帧

    所以也许转换 pandas -> R -> Spark -> hdfs?

    【讨论】:

      【解决方案3】:

      另一种方法是将您的 pandas 数据帧转换为 spark 数据帧(使用 pyspark)并使用 save 命令将其保存到 hdfs。 例子

          df = pd.read_csv("data/as/foo.csv")
          df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
          sc = SparkContext(conf=conf)
          sqlCtx = SQLContext(sc)
          sdf = sqlCtx.createDataFrame(df)
      
      

      这里astype 将您的列类型从object 更改为string。这样可以避免引发异常,因为 spark 无法识别 pandas 类型 object。但请确保这些列确实是字符串类型。

      现在将您的 df 保存在 hdfs 中:

          sdf.write.csv('mycsv.csv')
      

      【讨论】:

        【解决方案4】:

        一个 hack 可能是从大的创建 N 个 pandas 数据帧(每个小于 2 GB)(水平分区)并创建 N 个不同的 spark 数据帧,然后合并(联合)它们以创建最后一个写入 HDFS。我假设你的主机很强大,但你也有一个运行 Spark 的集群。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-05-21
          • 2017-08-02
          • 2021-12-06
          • 2018-07-03
          • 2021-11-06
          相关资源
          最近更新 更多