【问题标题】:Broadcasting updates on spark jobs广播有关 Spark 作业的更新
【发布时间】:2019-09-09 19:51:06
【问题描述】:

我在这里看到过这个问题,但他们基本上专注于火花流,我找不到适合批量工作的解决方案。这个想法是循环几天,并在每次迭代/天更新有关前一天的信息(用于当前迭代)。代码如下所示:

var prevIterDataRdd = // some RDD

days.foreach(folder => {
  val previousData : Map[String, Double] = parseResult(prevIterDataRdd)
  val broadcastMap = sc.broadcast(previousData)

  val (result, previousStatus) =
    processFolder(folder, broadcastMap)

  // store result
  result.write.csv(outputPath)

  // updating the RDD that enables me to extract previousData to update broadcast
  val passingPrevStatus = prevIterDataRdd.subtractByKey(previousStatus)
  prevIterDataRdd = previousStatus.union(passingPrevStatus)

  broadcastMap.unpersist(true)
  broadcastMap.destroy()
})

使用broadcastMap.destroy() 无法运行,因为它不允许我再次使用broadcastMap(我实际上不明白,因为它应该是完全不相关的——不可变的)。

我应该如何运行这个循环并在每次迭代时更新广播变量?

当使用 unpersist 方法时,我传递了 true 参数以使其阻塞。 sc.broadcast() 也被屏蔽了吗?

如果我再次广播,我真的需要unpersist()吗?

既然我正在创建一个新的广播变量,为什么我在使用destroy 后不能再次使用广播?

【问题讨论】:

    标签: scala apache-spark broadcast


    【解决方案1】:

    广播变量是不可变的,但您可以创建一个新的广播变量。 这个新的广播变量可以在下一次迭代中使用。

    您需要做的就是更改对新创建的广播的引用,从执行程序中取消保留旧广播并从驱动程序中销毁它。

    类级别定义变量,这将允许您更改驱动程序中广播变量的引用并使用destroy方法。

    object Main extends App {
    
      // defined and initialized at class level to allow reference change
      var previousData: Map[String, Double] = null
    
      override def main(args: Array[String]): Unit = {
        //your code
    
      }
    }
    

    您不能对变量使用 destroy 方法,因为驱动程序中不再存在引用。更改对新广播变量的引用可以解决问题。

    Unpersist 仅从执行程序中删除数据,因此当重新访问变量时,驱动程序会将其重新发送给执行程序。

    blocking = true 将允许您让应用程序在下次访问之前从执行程序中完全删除数据。

    sc.broadcast() - 没有官方文档说它是阻塞的。尽管一旦调用它,应用程序就会在运行下一行代码之前开始将数据广播给执行程序。因此,如果数据非常大,它可能会减慢您的应用程序。所以要小心你是如何使用它的。

    在销毁之前调用 unpersist 是一个很好的做法。这将帮助您完全摆脱执行程序和驱动程序的数据。

    【讨论】:

    • 在运行下一次迭代之前使用.unpersist(true) 是否有意义,这样我可以确定下次执行程序调用广播变量时​​它会获得它的新值?
    • 是的。您使用它的方式是正确的。您只需要定义和初始化您将在类级别广播的变量。
    猜你喜欢
    • 1970-01-01
    • 2016-05-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-02
    • 2015-08-26
    • 2018-11-12
    相关资源
    最近更新 更多