【发布时间】:2016-08-23 23:59:50
【问题描述】:
我正在尝试使用Spark 1.6.1 将parquet 文件写入Amazon S3。我生成的小parquet 是~2GB,一旦写入,它就没有那么多数据。我试图证明Spark 是我可以使用的平台。
基本上我要做的是设置一个star schema 和dataframes,然后我要把这些表写成镶木地板。数据来自供应商提供的 csv 文件,我使用 Spark 作为ETL 平台。我目前在ec2(r3.2xlarge) 中有一个 3 节点集群,所以在执行程序上的内存为120GB,总共有 16 个内核。
输入文件总共大约 22GB,我现在正在提取大约 2GB 的数据。最终,当我开始加载完整数据集时,这将是许多 TB。
这是我的 spark/scala pseudocode:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
465884512 行的计数大约需要 2 分钟。写入 parquet 需要 38 分钟
我知道coalesce 对执行写入操作的驱动程序进行了洗牌......但它所花费的时间让我觉得我做错了什么。如果没有coalesce,这仍然需要 15 分钟,IMO 仍然太长,并且给了我大量的小 parquet 文件。我希望每天拥有一个大文件的数据。我也有代码来执行按字段值分区,而且它同样慢。我也尝试将其输出到csv,这需要大约 1 小时。
另外,我在提交作业时并没有真正设置运行时道具。我的一项工作的控制台统计数据是:
- 活着的工人:2
- 正在使用的核心:总共 16 个,已使用 16 个
- 正在使用的内存:总计 117.5 GB,已使用 107.5 GB
- 申请:1 个正在运行,5 个已完成
- 驱动程序:0 个正在运行,0 个已完成
- 状态:活着
【问题讨论】:
-
a coalesce 不会对驱动程序进行改组,而是在执行程序之间改组,但这与您看到的问题无关。你在使用电子病历吗?如果是这样,请使用 s3:// 而不是 s3a://。无论哪种方式,在 Spark 1.6 上,您都应该像 @David 所说的那样使用 Direct OutputCommitter。另一个可能的改进是将 parquet.enable.summary-metadata 设置为 false
-
在 S3 前使用 Alluxio 是否会加快速度?
标签: scala amazon-s3 apache-spark apache-spark-sql parquet