【问题标题】:Spark: UDF executed many timesSpark:UDF多次执行
【发布时间】:2019-11-04 15:12:40
【问题描述】:

我有一个包含以下代码的数据框:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

现在检查日志,我发现每行 UDF 执行 3 次。如果我从“test.three”列中添加“test3”,则会再次执行 UDF。

谁能解释一下为什么?

这是否可以正确避免(在添加“测试”后不缓存数据帧,即使这样可行)?

【问题讨论】:

  • 什么意思?您正在调用 test 函数三遍。这就是为什么它被执行了三遍。不知道你为什么要把它变成一个 UDF。为什么不直接将 Map 设为 val?
  • 这只是一个展示spark行为的例子。对我来说,“测试”是一个包含结构的新列,然后访问结构的任何部分都不应再次执行 UDF。我怎么错了?
  • 我尝试打印架构,“test”的数据类型是Map,而不是结构。现在,如果 UDF 返回一个类似 Test(one String, two: String) 的案例类,而不是返回 Map,那么 test 确实是一个 Struct,但 UDF 的执行次数总是一样多。
  • 缓存应该根据这个答案工作:stackoverflow.com/a/40962714/1138523

标签: scala apache-spark apache-spark-sql


【解决方案1】:

如果您想避免多次调用 udf(如果 udf 是您工作中的瓶颈,这尤其有用),您可以按以下方式进行:

val testUDF = udf(test _).asNondeterministic()

基本上你告诉 Spark 你的函数不是确定性的,现在 Spark 确保它只被调用一次,因为多次调用它是不安全的(每次调用可能返回不同的结果)。

还要注意,这个技巧不是免费的,通过这样做,您对优化器施加了一些限制,这样做的一个副作用是,例如 Spark 优化器不会通过不确定的表达式推送过滤器,因此您会变成负责查询中过滤器的最佳位置。

【讨论】:

  • 不错!这个答案也属于这里:stackoverflow.com/questions/40320563/…
  • 在我的例子中,asNondeterministic 强制 UDF 只执行一次。使用explode(array(myUdf($"id"))) 解决方案,它仍然会执行两次。
  • @David Vrba 你是什么意思? :因此您需要负责查询中过滤器的最佳位置。
  • @thebluephantom 如果您的表达式是确定性的,Spark 优化器将通过它们推送过滤器。如果表达式是不确定的(udf.asNondeterministic() 就是这种情况),优化器不会推送它,所以你最好尽快调用过滤器。
  • 你能举个例子吗?请
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-12-24
相关资源
最近更新 更多