【问题标题】:rdd.map invoking a function twice rather than oncerdd.map 调用函数两次而不是一次
【发布时间】:2019-11-07 03:54:44
【问题描述】:

我是 Scala 编程的新手,目前正在使用 RDD。我正在尝试将 RDD 传递给一个函数,并希望该函数返回,以便我可以将它存储到一个新的 RDD。出于我使用地图的目的。但是 map 调用了该函数两次,而 RDD 中只有一个条目。当我使用 collect.foreach() 而不是 map 时,它工作正常,但我无法将更新值保存在新的 RDD 中,因为它在 Unit 中返回值。

此代码从更新函数返回值,但调用该函数两次:

temp_rdd = my_rdd.map{x => update(x)}

而这个完美地调用它一次,但我无法修改 RDD 值:

my_rdd.collect().foreach{x => update(x)}

foreach 函数返回“单位”格式,因为我无法将其保存在新的 RDD 中。我正在寻找一种将更新后的值存储在新 RDD 中的方法。

【问题讨论】:

  • 函数将被调用的次数与调用“temp_rdd”操作的次数一样多。每次“temp_rdd”具体化时,都会调用函数。例如,如果调用动作“temp_rdd.collect()” 3 次,则所有转换(在本例中为“map”)将被调用 3 次。为避免这种情况,可以使用缓存。

标签: scala apache-spark rdd


【解决方案1】:

来自https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

map 是一个转换,它将每个数据集元素传递给一个函数并返回一个表示结果的新 RDD。 Spark 中的所有转换都是惰性的,并且在操作需要将结果返回给驱动程序时进行计算。默认情况下,每个转换后的 RDD 可能会在您每次对其执行操作时重新计算(或者您可以使用 .cache() 将 RDD 持久化到内存中)。

另一方面,actions(例如,collectreduce)在对 RDD 运行计算后向驱动程序返回一个值(不是 RDD)。

下面是一个缓存 RDD 以防止其计算多次的示例

val array = Array("1", "2", "3")
val rdd = sc.parallelize(array)
var i = 0
val mapRdd = rdd.map(s"$i: " + _)
mapRdd.take(3).foreach(println) // mapRdd is computed here...
// Output
// 0: 1
// 0: 2
// 0: 3

i = i + 1
mapRdd.take(3).foreach(println) // ... and here
// Output
// 1: 1
// 1: 2
// 1: 3

val cachedMapRdd = rdd.map(s"$i: " + _).cache()
cachedMapRdd.take(3).foreach(println) // cachedMapRdd is computed here...
// Output
// 1: 1
// 1: 2
// 1: 3

i = i + 1
cachedMapRdd.take(3).foreach(println) // ... but not here
// Output
// 1: 1
// 1: 2
// 1: 3

【讨论】:

  • 感谢您的回答。我已经使用 .cache() 实现了我的程序,我的问题已经解决。我已经设置了 2 个工作节点,但我的程序的执行是在一个工作人员中进行的,而不是在 2 个工作人员中并行执行。对这个问题有什么建议吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-01-01
  • 2017-02-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多