【问题标题】:Persisting / Sharing a RDD in Spark Job Server在 Spark Job Server 中持久化/共享 RDD
【发布时间】:2016-02-29 23:46:11
【问题描述】:

我希望持久化来自 Spark 作业的 RDD,以便使用 Spark 作业服务器的所有后续作业都可以使用它。这是我尝试过的:

工作一:

package spark.jobserver

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try

object FirstJob extends SparkJob with NamedRddSupport {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[4]").setAppName("FirstJob")
    val sc = new SparkContext(conf)
    val config = ConfigFactory.parseString("")
    val results = runJob(sc, config)
    println("Result is " + results)
  }

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid

  override def runJob(sc: SparkContext, config: Config): Any = {

    // the below variable is to be accessed by other jobs:
    val to_be_persisted : org.apache.spark.rdd.RDD[String] = sc.parallelize(Seq("some text"))

    this.namedRdds.update("resultsRDD", to_be_persisted)
    return to_be_persisted
  }
}

工作 2:

package spark.jobserver

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try


object NextJob extends SparkJob with NamedRddSupport {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[4]").setAppName("NextJob")
    val sc = new SparkContext(conf)
    val config = ConfigFactory.parseString("")
    val results = runJob(sc, config)
    println("Result is " + results)
  }

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid

  override def runJob(sc: SparkContext, config: Config): Any = {

    val rdd = this.namedRdds.get[(String, String)]("resultsRDD").get
    rdd
  }
}

我得到的错误是:

{
  "status": "ERROR",
  "result": {
    "message": "None.get",
    "errorClass": "java.util.NoSuchElementException",
    "stack": ["scala.None$.get(Option.scala:313)", "scala.None$.get(Option.scala:311)", "spark.jobserver.NextJob$.runJob(NextJob.scala:30)", "spark.jobserver.NextJob$.runJob(NextJob.scala:16)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:278)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)", "java.lang.Thread.run(Thread.java:745)"]
  }

请修改上面的代码,使to_be_persisted 可以访问。 谢谢

编辑:

在使用以下方法编译和打包 scala 源代码后创建了 spark 上下文:

curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&mem-per-node=512m'

调用 FirstJob 和 NextJob 使用:

curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.FirstJob&context=test-context&sync=true'

curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.NextJob&context=test-context&sync=true'

【问题讨论】:

  • 如果您必须从 NamedRDD 中受益,您必须在同一上下文中运行所有作业。你这样做吗?
  • 是的 noorul,我对这两个工作使用相同的上下文。
  • @vdep 你能否分享一下你用于此代码 sn-p 的依赖 jar 详细信息,因为我想写一个类似的并且我有 maven 的依赖问题
  • @dileepVikram,不幸的是我现在没有。我在以前的组织中从事此工作。

标签: scala apache-spark spark-jobserver


【解决方案1】:

这里似乎有两个问题:

  1. 如果您使用的是最新的 spark-jobserver 版本 (0.6.2-SNAPSHOT),则存在一个关于 NamedObjects 无法正常工作的开放错误 - 似乎符合您的描述:https://github.com/spark-jobserver/spark-jobserver/issues/386 .

  2. 你也有一个小的类型不匹配 - 在 FirstJob 中你坚持一个 RDD[String],而在 NextJob 中你试图获取一个 RDD[(String, String)] - 在 NextJob 中,应该读取 val rdd = this.namedRdds.get[String]("resultsRDD").get)。

我已经用 spark-jobserver 版本 0.6.0 和上面所说的小类型修复尝试了你的代码,它可以工作。

【讨论】:

  • 感谢 Tzach,它有效。 (它也适用于 spark 1.5.2 和 sjs 0.6.1)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-07-17
  • 2016-01-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-07-12
相关资源
最近更新 更多