【发布时间】:2018-11-28 05:11:02
【问题描述】:
我有相对大量的数据。 5000 个 orc 文件,每个大约 300 mb。和 4 部字典(每部几 kb)。
将其加载到数据框中(大约 1-2 小时)并加入后,我尝试对其进行分组和聚合。
tableDS
.join(broadcast(enq1), $"url".contains($"q1"), "left_outer")
.join(broadcast(enq2), $"url".contains($"q2"), "left_outer")
.join(broadcast(enq3), $"url".contains($"q3"), "left_outer")
.join(broadcast(model), $"url".contains($"model"), "left_outer")
.groupBy($"url", $"ctn", $"timestamp")
.agg(
collect_set(when($"q2".isNotNull && $"q3".isNotNull && $"model".isNotNull, $"model").otherwise(lit(null))).alias("model"),
collect_set(when($"q1".isNotNull && $"q2".isNotNull && $"model".isNotNull, $"model").otherwise(lit(null))).alias("model2")
)
需要 15-16 小时。
我的问题是。
1) 用于数据帧的 groupby 是否与用于 rdd'd 的 groupbykey 相同(执行所有数据的混洗),如果确实如此,是否会迁移到数据集方法 groupbykey.reducegroupes 甚至 rdd reducebykey 提高性能?
2) 还是资源有问题?有 200 个任务执行分组,增加这些任务的数量会有帮助吗?我该怎么做呢?
这就是我运行它的方式
spark-submit \
--class Main \
--master yarn \
--deploy-mode client \
--num-executors 200 \
--executor-cores 20 \
--driver-memory 8G \
--executor-memory 16G \
--files hive-site.xml#hive-site.xml \
--conf spark.task.maxFailures=10 \
--conf spark.executor.memory=16G \
--conf spark.app.name=spark-job \
--conf spark.yarn.executor.memoryOverhead=4096 \
--conf spark.yarn.driver.memoryOverhead=2048 \
--conf spark.shuffle.service.enabled=true \
--conf spark.shuffle.consolidateFiles=true \
--conf spark.broadcast.compress=true \
--conf spark.shuffle.compress=true \
--conf spark.shuffle.spill.compress=true \
【问题讨论】:
-
不同分区的处理时间是否可比?也许你有数据倾斜。
-
@gor 一些任务在 8 小时内完成,少数任务在 16 小时内完成。大多数需要 12-13 小时。所以它不是很倾斜。
标签: apache-spark dataframe group-by