【发布时间】:2019-05-16 02:09:46
【问题描述】:
有没有一种简单的方法可以将DataFrame 保存到一个单个 parquet 文件中,或者将包含元数据的目录和parquet 生成的parquet 文件的一部分合并到一个单个文件存储在NFS上而不使用HDFS和hadoop?
【问题讨论】:
标签: apache-spark apache-spark-sql
有没有一种简单的方法可以将DataFrame 保存到一个单个 parquet 文件中,或者将包含元数据的目录和parquet 生成的parquet 文件的一部分合并到一个单个文件存储在NFS上而不使用HDFS和hadoop?
【问题讨论】:
标签: apache-spark apache-spark-sql
要只保存一个文件而不是多个文件,您可以在保存数据之前在 RDD/Dataframe 上调用 coalesce(1) / repartition(1)。
如果您已经有一个包含小文件的目录,您可以创建一个 Compacter 进程,该进程将读取现有文件并将它们保存到一个新文件中。例如
val rows = parquetFile(...).coalesce(1)
rows.saveAsParquetFile(...)
您可以使用 saveAsParquetFile 存储到本地文件系统。例如
rows.saveAsParquetFile("/tmp/onefile/")
【讨论】:
我能够使用此方法在 Spark 1.6.1 中使用 snappy 格式压缩 parquet 文件。我使用了覆盖,以便我可以在需要时重复该过程。这是代码。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
object CompressApp {
val serverPort = "hdfs://myserver:8020/"
val inputUri = serverPort + "input"
val outputUri = serverPort + "output"
val config = new SparkConf()
.setAppName("compress-app")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(config)
val sqlContext = SQLContext.getOrCreate(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
import sqlContext.implicits._
def main(args: Array[String]) {
println("Compressing Parquet...")
val df = sqlContext.read.parquet(inputUri).coalesce(1)
df.write.mode(SaveMode.Overwrite).parquet(outputUri)
println("Done.")
}
}
【讨论】:
coalesce(N) 到目前为止救了我。
如果您的表是分区的,那么也使用repartition("partition key")。
【讨论】: