【发布时间】:2019-06-16 03:50:56
【问题描述】:
我想根据特定条件从大型 DataFrame 生成分层 TFrecord 文件,为此我使用write.partitionBy()。我也在 SPARK 中使用 tensorflow-connector,但这显然不能与 write.partitionBy() 操作一起使用。因此,除了尝试分两步工作之外,我还没有找到其他方法:
- 根据我的情况使用
partitionBy()重新分区数据帧并将生成的分区写入 parquet 文件。 - 使用 tensorflow-connector 插件读取这些 parquet 文件以将其转换为 TFrecord 文件。
这是我无法有效完成的第二步。我的想法是读取 executors 上的各个 parquet 文件并立即将它们写入 TFrecord 文件。但这需要访问只能在驱动程序 (discussed here) 中完成的 SQLContext,因此不能并行。我想做这样的事情:
# List all parquet files to be converted
import glob, os
files = glob.glob('/path/*.parquet'))
sc = SparkSession.builder.getOrCreate()
sc.parallelize(files, 2).foreach(lambda parquetFile: convert_parquet_to_tfrecord(parquetFile))
我可以构造函数convert_parquet_to_tfrecord 来在执行程序上执行此操作吗?
我也试过在读取所有 parquet 文件时只使用通配符:
SQLContext(sc).read.parquet('/path/*.parquet')
这确实会读取所有 parquet 文件,但不幸的是不会读取到单个分区中。看起来原始结构丢失了,所以如果我想要将单个镶木地板文件的确切内容转换为 TFrecord 文件,这对我没有帮助。
还有其他建议吗?
【问题讨论】:
标签: apache-spark pyspark pyspark-sql parquet tfrecord