【发布时间】:2019-12-25 12:54:20
【问题描述】:
我正在尝试加入两个 Dataframe,一个大约有 1000 万条记录,另一个大约是其中的 1/3。由于较小的 DataFrame 很适合 executor 的内存,所以我执行广播连接,然后写出结果:
val df = spark.read.parquet("/plablo/data/tweets10M")
.select("id", "content", "lat", "lon", "date")
val fullResult = FilterAndClean.performFilter(df, spark)
.select("id", "final_tokens")
.filter(size($"final_tokens") > 1)
val fullDFWithClean = {
df.join(broadcast(fullResult), "id")
}
fullDFWithClean
.write
.partitionBy("date")
.mode(saveMode = SaveMode.Overwrite)
.parquet("/plablo/data/cleanTokensSpanish")
过了一会儿,我得到了这个错误:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:125)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:36)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:68)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:88)
at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:209)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:141)
at org.apache.spark.sql.execution.FileSourceScanExec.doProduceVectorized(DataSourceScanExec.scala:392)
at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:315)
.....
this question 解决了同样的问题。在 cmets 中,提到增加 spark.sql.broadcastTimeout 可以解决问题,但是在设置一个较大的值(5000 秒)后,我仍然得到同样的错误(当然,虽然要晚得多)。
原始数据按date 列分区,返回fullResult 的函数执行一系列窄转换并过滤数据,所以我假设分区被保留。
物理计划确认 spark 将执行BroadcastHashJoin
*Project [id#11, content#8, lat#5, lon#6, date#150, final_tokens#339]
+- *BroadcastHashJoin [id#11], [id#363], Inner, BuildRight
:- *Project [id#11, content#8, lat#5, lon#6, date#150]
: +- *Filter isnotnull(id#11)
: +- *FileScan parquet [lat#5,lon#6,content#8,id#11,date#150]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[hdfs://geoint1.lan:8020/plablo/data/tweets10M],
PartitionCount: 182, PartitionFilters: [], PushedFilters:
[IsNotNull(id)], ReadSchema:
struct<lat:double,lon:double,content:string,id:int>
+- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *Project [id#363, UDF(UDF(UDF(content#360))) AS
final_tokens#339]
+- *Filter (((UDF(UDF(content#360)) = es) && (size(UDF(UDF(UDF(content#360)))) > 1)) && isnotnull(id#363))
+- *FileScan parquet [content#360,id#363,date#502] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://geoint1.lan:8020/plablo/data/tweets10M], PartitionCount: 182, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<content:string,id:int>
我相信,考虑到我的数据大小,这个操作应该会比较快(在 4 个执行器上,每个执行器有 5 个核心,并且在 YARN 上以集群模式运行 4g RAM)。
感谢任何帮助
【问题讨论】:
-
您是否认为您的第一个数据集的某些分区太大,因此它的结果和广播数据集太大?您可以查看每个分区的大小和
id值的分布情况来验证它。 -
我认为这不是问题,id 是唯一的行标识符,所以它应该是均匀分布的
标签: scala apache-spark apache-spark-sql apache-spark-dataset