【问题标题】:How to force Spark to evaluate DataFrame operations inline如何强制 Spark 内联评估 DataFrame 操作
【发布时间】:2019-02-21 21:25:10
【问题描述】:

根据Spark RDD docs

Spark 中的所有转换都是惰性的,因为它们不会立即计算结果...这种设计使 Spark 能够更高效地运行。

有时我需要对我的数据框进行某些操作当时和现在。但是由于数据帧操作是“惰性评估”(如上所述),当我在代码中编写这些操作时,几乎不能保证 Spark 会实际上内联执行这些操作其余的代码。例如:

val someDataFrame : DataFrame = getSomehow()
val someOtherDataFrame : DataFrame = getSomehowAlso()
// Do some stuff with 'someDataFrame' and 'someOtherDataFrame'

// Now we need to do a union RIGHT HERE AND NOW, because
// the next few lines of code require the union to have
// already taken place!
val unionDataFrame : DataFrame = someDataFrame.unionAll(someOtherDataFrame)

// Now do some stuff with 'unionDataFrame'...

因此(到目前为止)我的解决方法是在我的时间敏感数据帧操作之后立即运行 .show().count(),如下所示:

val someDataFrame : DataFrame = getSomehow()
val someOtherDataFrame : DataFrame = getSomehowAlso()
// Do some stuff with 'someDataFrame' and 'someOtherDataFrame'

val unionDataFrame : DataFrame = someDataFrame.unionAll(someOtherDataFrame)
unionDataFrame.count()  // Forces the union to execute/compute

// Now do some stuff with 'unionDataFrame'...

...强制Spark 立即执行数据帧操作,内联。

这对我来说感觉非常 hacky/kludgy。所以我问:有没有更普遍接受和/或有效的方法来强制数据帧操作按需发生(而不是延迟评估)?

【问题讨论】:

  • 对我来说,spark 似乎根本没有保留这些评估。我正在使用 Jupyter Notebook,当我连续两次调用 .show() 时,第二次仍然需要很长时间。我假设它重新计算了整个事情,即使我只是计算了它。有人可以确认吗?我正在寻找避免这些重新计算的方法。

标签: apache-spark lazy-evaluation distributed-computing rdd spark-dataframe


【解决方案1】:

你必须调用一个 action 来强制 Spark 做实际的工作。 变换不会触发这种效果,这也是喜欢的原因之一。


顺便说一句,我很确定 非常清楚什么时候必须在“此时此地”完成某件事,所以很可能你关注的是错误的点。


您能否确认count()show() 被视为“操作”

您可以在documentation 中看到Spark 的一些操作函数,其中列出了count()show() 不是,我之前也没用过,但感觉就像是一个动作——不做实际工作怎么能显示结果? :)

您是否暗示 Spark 会自动接受这一点,并(及时)进行联合?

是的! :)

会记住您调用的transformations,当action 出现时,它会在正确的时间执行它们!


需要记住的一点:由于这项政策,只有在动作出现时才进行实际工作,您不会在转换中看到逻辑错误(s ),直到动作发生!

【讨论】:

  • 感谢@gsamaras (+1) 如果您不介意的话,请回答两个快速跟进问题:(1) 您能否确认count()show() 被视为“操作 i>”,从而迫使 Spark 进行实际工作?并且 (2) 我对你的陈述很感兴趣“我很确定 spark 非常清楚什么时候必须“此时此地”做某事,所以你可能关注的是错误的点。 ”。但是,如果在我执行union(...) 之后,我需要立即对unionedDataFrame 进行“处理”。您是否暗示 Spark 会自动接受这一点,并(及时)进行联合?再次感谢!
  • show 是一个动作。见:spark.apache.org/docs/latest/api/scala/…
【解决方案2】:

我同意你的观点,在某些时候你想在需要时执行该操作。 例如,如果您正在使用 Spark 流式传输数据,并且您想要评估在每个 RDD 上完成的转换,而不是为每个 RDD 累积转换,然后突然对这一大型数据集运行一个操作。

现在,假设您有一个 DataFrame,并且您已经对其进行了所有转换,那么您可以使用sparkContext.sql("CACHE table <table-name>")

这个缓存是急切的缓存,这将触发这个 DataFrame 上的动作,并评估这个 DataFrame 上的所有转换。

【讨论】:

    猜你喜欢
    • 2017-07-31
    • 1970-01-01
    • 2018-02-16
    • 2016-06-28
    • 1970-01-01
    • 2012-12-19
    • 1970-01-01
    • 2011-06-13
    • 2015-09-22
    相关资源
    最近更新 更多