【发布时间】:2018-06-08 16:21:08
【问题描述】:
我有一个较大的(~20 GB)parquet 格式的分区数据集。我想使用pyarrow 从数据集中读取特定分区。我以为我可以使用pyarrow.parquet.ParquetDataset 完成此操作,但似乎并非如此。这是一个小例子来说明我想要什么。
创建随机数据集:
from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob
import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset
def get_partitions(basepath, partitions):
"""Generate directory hierarchy for a paritioned dataset
data
├── part1=foo
│ └── part2=True
├── part1=foo
│ └── part2=False
├── part1=bar
│ └── part2=True
└── part1=bar
└── part2=False
"""
path_tmpl = '/'.join(['{}={}'] * len(partitions)) # part=value
path_tmpl = '{}/{}'.format(basepath, path_tmpl) # part1=val/part2=val
parts = [product([part], vals) for part, vals in partitions.items()]
parts = [i for i in product(*parts)]
return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]
partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
# 3 columns, 5 rows
data = [pa.array(np.random.rand(5)) for i in range(3)]
table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
os.makedirs(part, exist_ok=True)
out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
table.schema, flavor='spark')
out.write_table(table)
out.close()
我想读取分区 1 的所有值,仅读取分区 2 的 True。使用 pandas.read_parquet,这是不可能的,我必须始终读取整个列。我用pyarrow 尝试了以下操作:
parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()
这也不行:
>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')
我可以像这样在pyspark 中轻松做到这一点:
def get_spark_session_ctx(appName):
"""Get or create a Spark Session, and the underlying Context."""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appName).getOrCreate()
sc = spark.sparkContext
return (spark, sc)
spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()
如下所示:
>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')
这可以用pyarrow 或pandas 完成,还是我需要一些自定义实现?
更新:应 Wes 的要求,现在在 JIRA。
【问题讨论】:
标签: python parquet pyarrow apache-arrow