【问题标题】:Returning results from executor to driver从执行程序返回结果给驱动程序
【发布时间】:2020-05-27 19:27:53
【问题描述】:

我有一个 spark 应用程序,它基本上接收一个大数据集,对其执行一些计算,最后执行一些 IO 将其存储在数据库中。所有这些阶段都发生在执行程序上,驱动程序从每个任务中获取(收集)一个布尔值,表示该任务的成功/失败状态(例如,某些项目的计算或 IO 可能会失败)。

例如,以下是一个过于简化的沿袭(在实际实现中,有多个重新分区和计算步骤):

readSomeDataset()
  .repartition()
  .mapPartition { // do some calculation }
  .mapPartition { // do some IO }
  .collect()

问题: 根据计算结果,我想在驱动程序上做其他事情(比如发布一条消息说“计算成功”)。这需要对整个数据集执行一次,而不是针对单个分区,因此需要在驱动程序上执行。

但是,executors 上的 IO 需要很长时间,我不想在发布之前等待它完成。

有没有办法让执行者在处理任务的过程中将“消息”发回给驱动程序?

(我想到了像 Accumulators 这样的东西,但是,afaik 只有在执行器上的最终操作完成后它们才能使用)

【问题讨论】:

  • Scala 、python 还是 java?
  • 我正在使用 scala。但是,这在语言之间真的有区别吗?

标签: apache-spark


【解决方案1】:

Spark 是一个惰性框架,需要一个完整的作业(从读取到写入)来执行,它不能只执行一部分。

要在不重新处理的情况下进行这些更改,您可以缓存数据帧,以尽可能快地恢复,就像这样。

val calculatedDF = readSomeDataset()
  .repartition()
  .mapPartition { // do some calculation }
  .cache() // or persist if can't fit in memory of the executors

if (caculatedDF.map(checkEackAreOK).reduce(_ && _).head) { // a condition to see if the calculations are ok and an action to launch it
  println("correct calculation")
  calculatedDF
    .mapPartition { // do some IO }
    .collect()
} else {
  println("incorrect calculation")
}

【讨论】:

  • 是的,这是可能的。我试图避免缓存,因为这在时间和执行程序内存方面也有一些开销。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-09-15
  • 2018-01-11
  • 1970-01-01
  • 2020-06-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多