【问题标题】:How to perform one operation on each executor once in spark如何在火花中对每个执行器执行一次操作
【发布时间】:2017-02-22 06:27:51
【问题描述】:

我有一个存储在 S3 中的 weka 模型,大小约为 400MB。 现在,我有一些记录,我想在这些记录上运行模型并执行预测。

对于执行预测,我尝试的是,

  1. 将模型作为静态对象下载并加载到驱动程序上,并将其广播给所有执行程序。对预测 RDD 执行映射操作。 ----> 不起作用,因为在 Weka 中执行预测,需要修改模型对象,并且广播需要只读副本。

  2. 将模型作为静态对象下载并加载到驱动程序上,并在每个映射操作中将其发送到执行程序。 -----> 工作(效率不高,因为在每个地图操作中,我传递了 400MB 对象)

  3. 在驱动程序上下载模型并将其加载到每个执行程序上并缓存在那里。 (不知道该怎么做)

有人知道如何在每个执行器上加载模型一次并缓存它,以便其他记录不再加载它吗?

【问题讨论】:

标签: scala apache-spark weka partitioning


【解决方案1】:

你有两个选择:

1。创建一个带有表示数据的惰性 val 的单例对象:

    object WekaModel {
        lazy val data = {
            // initialize data here. This will only happen once per JVM process
        }
    }       

然后,您可以在 map 函数中使用惰性 val。 lazy val 确保每个工作 JVM 初始化他们自己的数据实例。不会为data 执行序列化或广播。

    elementsRDD.map { element =>
        // use WekaModel.data here
    }

优势

  • 效率更高,因为它允许您为每个 JVM 实例初始化一次数据。例如,当需要初始化数据库连接池时,这种方法是一个不错的选择。

缺点

  • 对初始化的控制较少。例如,如果您需要运行时参数,则初始化对象会比较棘手。
  • 如果需要,您无法真正释放或释放对象。通常这是可以接受的,因为操作系统会在进程退出时释放资源。

2。在 RDD 上使用mapPartition(或foreachPartition)方法,而不仅仅是map

这允许您初始化整个分区所需的任何内容。

    elementsRDD.mapPartition { elements =>
        val model = new WekaModel()

        elements.map { element =>
            // use model and element. there is a single instance of model per partition.
        }
    }

优势

  • 在对象的初始化和取消初始化方面提供更大的灵活性。

缺点

  • 每个分区都会创建并初始化对象的一个​​新实例。根据每个 JVM 实例有多少个分区,这可能是也可能不是问题。

【讨论】:

  • 你确定#1吗?我收到了序列化错误。还有,如果数据初始化依赖运行时参数怎么办?
  • 方法#1 不应该发生任何序列化。如果有,很可能您在 RDD 方法中引用了一个中间对象。关于你关于初始化的问题,确实比较难控制。您的运行时参数也需要静态可用(例如,通过系统属性或配置文件)。单例初始化不是 Spark 特有的。这是一个 Scala 主题。
  • 你能在 Java 中做同样的事情吗?
  • 关于这个缺点,Less control over initialization. For example, it's trickier to initialize your object if you require runtime parameters.。这正是我想要达到的目标。你有任何例子或者你见过这样做吗?我正在调用外部系统以获取数据库连接配置。所以理想情况下,我不想在每个执行程序上调用外部系统。我刚刚问了这个非常相似的问题。 stackoverflow.com/questions/47241882/…
  • 初始化数据库连接的一种可能方法是通过系统属性(例如System.getProperty("db.host")
【解决方案2】:

这对我来说比惰性初始化器更有效。我创建了一个初始化为 null 的对象级指针,并让每个执行器对其进行初始化。在初始化块中,您可以使用一次性代码。请注意,每个处理批次都会重置局部变量,但不会重置对象级变量。

object Thing1 {
  var bigObject : BigObject = null

  def main(args: Array[String]) : Unit = {
    val sc = <spark/scala magic here>
    sc.textFile(infile).map(line => {
      if (bigObject == null) {
         // this takes a minute but runs just once
         bigObject = new BigObject(parameters)  
      }
      bigObject.transform(line)
    })
  }
}

这种方法每个执行器只创建一个大对象,而不是其他方法的每个分区创建一个大对象。

如果您将 var bigObject : BigObject = null 放在主函数命名空间中,它的行为会有所不同。在这种情况下,它会在每个分区(即批处理)的开头运行 bigObject 构造函数。如果您有内存泄漏,那么这最终会杀死执行程序。垃圾收集也需要做更多的工作。

【讨论】:

  • 如果您的spark.executor.cores 大于1,这将多次调用new BigObject。惰性方法可防止并发初始化。
  • @Dan 你的意思是lazy var bigObject ...
  • @Dale,您的代码在技术上不是线程安全的,因为如果多个执行器同时运行,它们可以初始化您的全局对象。
【解决方案3】:

这是我们通常做的事情

  1. 定义一个做这些事情的单例客户端,以确保每个执行程序中只有一个客户端

  2. 有一个 getorcreate 方法来创建或获取客户端信息,通常让您有一个要为多个不同模型提供服务的通用服务平台,然后我们可以使用类似 concurrentmap 来确保线程安全和无计算

  3. getorcreate 方法将在 RDD 级别内调用,如 transform 或 foreachpartition,因此请确保 init 发生在执行程序级别

【讨论】:

    【解决方案4】:

    您可以通过广播一个带有惰性 val 的案例对象来实现这一点,如下所示:

    case object localSlowTwo {lazy val value: Int = {Thread.sleep(1000); 2}}
    val broadcastSlowTwo = sc.broadcast(localSlowTwo)
    (1 to 1000).toDS.repartition(100).map(_ * broadcastSlowTwo.value.value).collect
    

    三个线程的三个执行器上的事件时间线如下所示:

    从同一个 spark-shell 会话再次运行最后一行不再初始化:

    【讨论】:

      猜你喜欢
      • 2019-07-28
      • 2010-12-02
      • 2021-12-17
      • 2020-08-06
      • 1970-01-01
      • 1970-01-01
      • 2013-01-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多