【问题标题】:Spark Dataframe join Exception thrown in awaitResult在 awaitResult 中引发 Spark Dataframe join 异常
【发布时间】:2019-12-25 12:54:20
【问题描述】:

我正在尝试加入两个 Dataframe,一个大约有 1000 万条记录,另一个大约是其中的 1/3。由于较小的 DataFrame 很适合 executor 的内存,所以我执行广播连接,然后写出结果:

val df = spark.read.parquet("/plablo/data/tweets10M")
  .select("id", "content", "lat", "lon", "date")
val fullResult = FilterAndClean.performFilter(df, spark)
  .select("id", "final_tokens")
  .filter(size($"final_tokens") > 1)
val fullDFWithClean = {
  df.join(broadcast(fullResult), "id")
}
fullDFWithClean
    .write
    .partitionBy("date")
    .mode(saveMode = SaveMode.Overwrite)
    .parquet("/plablo/data/cleanTokensSpanish")

过了一会儿,我得到了这个错误:

org.apache.spark.SparkException: Exception thrown in awaitResult: 
at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:125)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:36)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:68)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:88)
at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:209)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:141)
at org.apache.spark.sql.execution.FileSourceScanExec.doProduceVectorized(DataSourceScanExec.scala:392)
at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:315)
.....

this question 解决了同样的问题。在 cmets 中,提到增加 spark.sql.broadcastTimeout 可以解决问题,但是在设置一个较大的值(5000 秒)后,我仍然得到同样的错误(当然,虽然要晚得多)。

原始数据按date 列分区,返回fullResult 的函数执行一系列窄转换并过滤数据,所以我假设分区被保留。

物理计划确认 spark 将执行BroadcastHashJoin

*Project [id#11, content#8, lat#5, lon#6, date#150, final_tokens#339]
+- *BroadcastHashJoin [id#11], [id#363], Inner, BuildRight
:- *Project [id#11, content#8, lat#5, lon#6, date#150]
:  +- *Filter isnotnull(id#11)
:     +- *FileScan parquet [lat#5,lon#6,content#8,id#11,date#150] 
Batched: true, Format: Parquet, Location: 
InMemoryFileIndex[hdfs://geoint1.lan:8020/plablo/data/tweets10M], 
PartitionCount: 182, PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: 
struct<lat:double,lon:double,content:string,id:int>
   +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
      +- *Project [id#363, UDF(UDF(UDF(content#360))) AS 
 final_tokens#339]
     +- *Filter (((UDF(UDF(content#360)) = es) && (size(UDF(UDF(UDF(content#360)))) > 1)) && isnotnull(id#363))
        +- *FileScan parquet [content#360,id#363,date#502] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://geoint1.lan:8020/plablo/data/tweets10M], PartitionCount: 182, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<content:string,id:int>

我相信,考虑到我的数据大小,这个操作应该会比较快(在 4 个执行器上,每个执行器有 5 个核心,并且在 YARN 上以集群模式运行 4g RAM)。

感谢任何帮助

【问题讨论】:

  • 您是否认为您的第一个数据集的某些分区太大,因此它的结果和广播数据集太大?您可以查看每个分区的大小和id值的分布情况来验证它。
  • 我认为这不是问题,id 是唯一的行标识符,所以它应该是均匀分布的

标签: scala apache-spark apache-spark-sql apache-spark-dataset


【解决方案1】:

在这种情况下,第一个问题是您尝试广播的数据帧有多大?值得估计它的大小(也请参见 this SO answerthis)。

请注意,Spark 的默认 spark.sql.autoBroadcastJoinThresholdonly 10Mb,因此您真的不应该广播非常大的数据集。

您对broadcast 的使用具有优先权,这可能会迫使Spark 做一些它本来会选择不做的事情。一个好的规则是仅在默认行为不可接受时才强制进行积极优化,因为积极优化通常会产生各种边缘条件,就像您正在经历的那样。

【讨论】:

  • 我听取了您的建议,但同时,我正在按 id 对两个 DataFrame 进行分区。在这种情况下,默认行为是执行 SortMergeJoin,如果我能正确地得到分区,它应该足够快。我会报告的
  • 这可能会有所帮助,但请记住,id 的分区已经强制对两个数据集进行随机播放,因此我不完全确定您要保存的内容,因为您尝试按顺序强制广播首先要避免洗牌...
  • 你是如何解决这个问题的?
  • 您好,我在胶水作业日志中发现了相同的错误消息,但我仍然无法修复它,这是我的问题:stackoverflow.com/questions/66135124/…,您能看一下吗?
【解决方案2】:

如果spark.task.maxDirectResultSize 没有增加,这也可能会失败。它的默认值为 1 兆字节 (1m)。试试spark.task.maxDirectResultSize=10g

【讨论】:

  • spark.task.maxDirectResultSize=10g 显示语法无效。
  • 您好,我在胶水作业日志中发现了相同的错误消息,但我仍然无法修复它,这是我的问题:stackoverflow.com/questions/66135124/…,您能看一下吗?
猜你喜欢
  • 2017-03-19
  • 2017-09-05
  • 2022-01-08
  • 2016-10-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多