【问题标题】:Dask read_parquet with pyarrow engineDask read_parquet 与 pyarrow 引擎
【发布时间】:2018-11-03 15:33:42
【问题描述】:

我有一个熊猫数据框。我正在使用 spark 将其保存到镶木地板上,然后尝试通过 dask 读取。问题是没有使用 pyarrow 引擎读回分区列。

df = pd.DataFrame({'i64': np.arange(1000, dtype=np.int64),
                            'Ii32': np.arange(1000, dtype=np.int32),
                            'f': np.arange(1000, dtype=np.float64),
                            't': [datetime.datetime.now()] * 1000,
                            'e': ['1'] * 998 + [None,'1'],
                            'g' : [np.NAN] * 998 + [None, ''],
                            'bhello': np.random.choice(['hello', 'Yo', 'people', '1'], size=1000).astype("O")})

spark = SparkSession \
            .builder \
            .appName("Python Spark arrow compatibility") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()
        spark.conf.set("spark.sql.execution.arrow.enabled", "true")
        #enable metadata write from spark
        spark.conf.set("parquet.enable.summary-metadata",  "true")
        #convert pandas df to spark df
        sparkDf = spark.createDataFrame(df)

        #write to parquet
        sparkDf.write.parquet(path, partitionBy=['bhello'])

        #use dask to read the above saved parquet with pyarrow engine
        df2 = dd.read_parquet('hdfs://127.0.0.1:8020/tmp/test/outputParquet10',
                              engine='pyarrow',
                             )

        print(df2.columns)
        self.assertIn('bhello', df2.columns)

任何想法我在这里做错了什么

【问题讨论】:

  • 检查这一点,pyarrow 引擎似乎可以毫无问题地找到分区列,fastparquet 也是如此(在这种情况下,如果 spark 未配置为,则路径需要是全局字符串,而不仅仅是目录编写元数据文件)。这是本地文件夹,而不是 HDFS。
  • @mdurant 我用下面的设置尝试了上面的方法来写入元数据 spark.conf.set("parquet.enable.summary-metadata", "true") 但仍然是同样的问题。还尝试使用全局路径 'hdfs://127.0.0.1:8020/tmp/test/outputParquet10/*/*.parquet' 但没有运气
  • 对于 pyarrow,可能相关:github.com/dask/dask/issues/3518 是否有充分的理由不尝试使用 fastparquet?
  • 我们目前正在使用 fastparquet 本身,但 pyarrow 在扫描时给了我 10 倍的加速,因此想要迁移到那个
  • 这真是一个惊喜。

标签: python dask


【解决方案1】:

我假设这是一个最小的工作示例。因此我的解决方案是使用dask 读取它,然后使用fastparquetpyarrow 引擎对其进行转换。

代码如下。

import dask.dataframe as dd
ddf=dd.read_csv('/destination/of/your/file/file.format_name')
ddf.to_parquet('/destination/of/your/file/file.parquet',engine = 'fastparquet') #default is fastparquet if both engines are installed.

希望这会有所帮助。

谢谢

迈克尔

【讨论】:

    猜你喜欢
    • 2018-11-25
    • 2021-01-28
    • 2020-01-25
    • 2022-01-16
    • 2020-02-25
    • 1970-01-01
    • 1970-01-01
    • 2018-12-17
    • 2018-04-04
    相关资源
    最近更新 更多