【问题标题】:How to force DataFrame evaluation in Spark如何在 Spark 中强制执行 DataFrame 评估
【发布时间】:2017-07-31 13:21:32
【问题描述】:

有时(例如,用于测试和基准测试)我想强制执行在 DataFrame 上定义的转换。 AFAIK 调用像 count 这样的动作并不能确保所有 Columns 都被实际计算,show 可能只计算所有 Rows 的子集(参见下面的示例)

我的解决方案是使用df.write.saveAsTableDataFrame 写入HDFS,但这会使我的系统“杂乱无章”,并带有我不想再保留的表。

那么触发DataFrame 评估的最佳方式是什么?

编辑:

请注意,最近在 spark 开发者列表上也有一个讨论:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html

我做了一个小例子,它表明 DataFrame 上的 count 不会评估所有内容(使用 Spark 1.6.3 和 spark-master = local[2] 进行测试):

val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception

使用相同的逻辑,这里是show 不评估所有行的示例:

val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception

编辑 2:对于 Eliasah:例外是这样说的:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
.
.
.
.

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
.
.
.
.

【问题讨论】:

  • 我的回答会对你有所帮助,因为你的假设是错误的stackoverflow.com/questions/31383904/…
  • 其次,我不确定我是否理解您的问题:“这让我的系统“混乱”了我不想再保留的表格。”这是什么意思?
  • 您需要对 DataFrame 转换进行单元测试吗?
  • @eliasah 我更愿意在我的数据帧上调用一个没有副作用的操作,编写配置单元表是一个(不需要的)副作用,这就是我所说的混乱。
  • @eliasah 根据此讨论(apache-spark-developers-list.1001551.n3.nabble.com/…,参见 Matei Zaharia 的帖子),数据框上的count 似乎并未评估所有列。但如果这不是真的,我很高兴

标签: scala apache-spark


【解决方案1】:

我更喜欢使用df.save.parquet()。这确实增加了磁盘 I/O 时间,您可以在以后估算和减去,但您确信 spark 执行了您预期的每个步骤,并且没有通过惰性评估来欺骗您。

【讨论】:

  • 如何估计磁盘 I/O 时间 Dan?另外,收集功能会起作用吗?还是会增加一些开销时间?
  • 我假设您有一个“df”并且您正在执行“X”操作。要获得基线时间,只需执行 df.save.parquet() 并计时。然后你在 df 上做方法,然后做 df_prime.save.parquet()。第一个会告诉你保存 df 需要多长时间。
【解决方案2】:

有点晚了,但根本原因如下:countRDDDataFrame 的作用不同。

DataFrames 中进行了优化,因为在某些情况下,您不需要加载数据来实际知道它具有的元素数量(尤其是在您的情况下,不涉及数据混洗)。因此,在调用count 时实现的DataFrame 不会加载任何数据,也不会传递给您的异常抛出。您可以通过定义自己的DefaultSourceRelation 轻松地进行实验,并看到在DataFrame 上调用count 将始终以buildScan 方法结束,而没有requiredColumns,无论您有多少列确实选择了(参见org.apache.spark.sql.sources.interfaces 了解更多信息)。这实际上是一个非常有效的优化;-)

虽然在RDDs 中,没有这样的优化(这就是为什么人们应该尽可能尝试使用DataFrames)。因此,RDD 上的 count 执行所有沿袭并返回构成任何分区的迭代器的所有大小的总和。

调用dataframe.count 进入第一个解释,但调用dataframe.rdd.count 进入第二个解释,因为您确实从DataFrame 构建了RDD。请注意,调用dataframe.cache().count 会强制实现dataframe,因为您需要Spark 缓存结果(因此它需要加载所有数据并对其进行转换)。但它确实有缓存数据的副作用......

【讨论】:

  • cache 不是一个激发而不是强迫它实现的建议吗?
  • 为我的案子工作。谢谢
【解决方案3】:

看来df.cache.count 是要走的路:

scala> val myUDF = udf((i:Int) => {if(i==1000) throw new RuntimeException;i})
myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))

scala> val df = sc.parallelize(1 to 1000).toDF("id")
df: org.apache.spark.sql.DataFrame = [id: int]

scala> df.withColumn("test",myUDF($"id")).show(10)
[rdd_51_0]
+---+----+
| id|test|
+---+----+
|  1|   1|
|  2|   2|
|  3|   3|
|  4|   4|
|  5|   5|
|  6|   6|
|  7|   7|
|  8|   8|
|  9|   9|
| 10|  10|
+---+----+
only showing top 10 rows

scala> df.withColumn("test",myUDF($"id")).count
res13: Long = 1000

scala> df.withColumn("test",myUDF($"id")).cache.count
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (int) => int)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
.
.
.
Caused by: java.lang.RuntimeException

Source

【讨论】:

  • 虽然这似乎可行,但它有一个副作用(即缓存)
  • spark 并不总是必须执行您可能打算执行的所有操作以执行计数,这有时会起作用
【解决方案4】:

我想只需从DataFrame 获取底层rdd 并对其触发操作即可实现您的目标。

df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions

【讨论】:

  • 这很奇怪。看起来这个答案适用于原始问题,但有人仍然设法在没有提及任何理由或反馈的情况下投了几票。 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-12-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-11
相关资源
最近更新 更多