【发布时间】:2019-09-09 19:51:06
【问题描述】:
我在这里看到过这个问题,但他们基本上专注于火花流,我找不到适合批量工作的解决方案。这个想法是循环几天,并在每次迭代/天更新有关前一天的信息(用于当前迭代)。代码如下所示:
var prevIterDataRdd = // some RDD
days.foreach(folder => {
val previousData : Map[String, Double] = parseResult(prevIterDataRdd)
val broadcastMap = sc.broadcast(previousData)
val (result, previousStatus) =
processFolder(folder, broadcastMap)
// store result
result.write.csv(outputPath)
// updating the RDD that enables me to extract previousData to update broadcast
val passingPrevStatus = prevIterDataRdd.subtractByKey(previousStatus)
prevIterDataRdd = previousStatus.union(passingPrevStatus)
broadcastMap.unpersist(true)
broadcastMap.destroy()
})
使用broadcastMap.destroy() 无法运行,因为它不允许我再次使用broadcastMap(我实际上不明白,因为它应该是完全不相关的——不可变的)。
我应该如何运行这个循环并在每次迭代时更新广播变量?
当使用 unpersist 方法时,我传递了 true 参数以使其阻塞。 sc.broadcast() 也被屏蔽了吗?
如果我再次广播,我真的需要unpersist()吗?
既然我正在创建一个新的广播变量,为什么我在使用destroy 后不能再次使用广播?
【问题讨论】:
标签: scala apache-spark broadcast