【问题标题】:How to run simple Spark app from Eclipse/Intellij IDE?如何从 Eclipse/Intellij IDE 运行简单的 Spark 应用程序?
【发布时间】:2014-05-03 13:33:27
【问题描述】:

为了在将任务实际部署到 Hadoop 之前简化在 Hadoop 上运行的 map reduce 任务的开发,我使用我编写的一个简单的 map reducer 进行了测试:

object mapreduce {
  import scala.collection.JavaConversions._

  val intermediate = new java.util.HashMap[String, java.util.List[Int]]
                                                  //> intermediate  : java.util.HashMap[String,java.util.List[Int]] = {}
  val result = new java.util.ArrayList[Int]       //> result  : java.util.ArrayList[Int] = []

  def emitIntermediate(key: String, value: Int) {
    if (!intermediate.containsKey(key)) {
      intermediate.put(key, new java.util.ArrayList)
    }
    intermediate.get(key).add(value)
  }                                               //> emitIntermediate: (key: String, value: Int)Unit

  def emit(value: Int) {
    println("value is " + value)
    result.add(value)
  }                                               //> emit: (value: Int)Unit

  def execute(data: java.util.List[String], mapper: String => Unit, reducer: (String, java.util.List[Int]) => Unit) {

    for (line <- data) {
      mapper(line)
    }

    for (keyVal <- intermediate) {
      reducer(keyVal._1, intermediate.get(keyVal._1))
    }

    for (item <- result) {
      println(item)
    }
  }                                               //> execute: (data: java.util.List[String], mapper: String => Unit, reducer: (St
                                                  //| ring, java.util.List[Int]) => Unit)Unit

  def mapper(record: String) {
    var jsonAttributes = com.nebhale.jsonpath.JsonPath.read("$", record, classOf[java.util.ArrayList[String]])
    println("jsonAttributes are " + jsonAttributes)
    var key = jsonAttributes.get(0)
    var value = jsonAttributes.get(1)

    println("key is " + key)
    var delims = "[ ]+";
    var words = value.split(delims);
    for (w <- words) {
      emitIntermediate(w, 1)
    }
  }                                               //> mapper: (record: String)Unit

  def reducer(key: String, listOfValues: java.util.List[Int]) = {
    var total = 0
    for (value <- listOfValues) {
      total += value;
    }

    emit(total)
  }                                               //> reducer: (key: String, listOfValues: java.util.List[Int])Unit
  var dataToProcess = new java.util.ArrayList[String]
                                                  //> dataToProcess  : java.util.ArrayList[String] = []
  dataToProcess.add("[\"test1\" , \"test1 here is another test1 test1 \"]")
                                                  //> res0: Boolean = true
  dataToProcess.add("[\"test2\" , \"test2 here is another test2 test1 \"]")
                                                  //> res1: Boolean = true

  execute(dataToProcess, mapper, reducer)         //> jsonAttributes are [test1, test1 here is another test1 test1 ]
                                                  //| key is test1
                                                  //| jsonAttributes are [test2, test2 here is another test2 test1 ]
                                                  //| key is test2
                                                  //| value is 2
                                                  //| value is 2
                                                  //| value is 4
                                                  //| value is 2
                                                  //| value is 2
                                                  //| 2
                                                  //| 2
                                                  //| 4
                                                  //| 2
                                                  //| 2


  for (keyValue <- intermediate) {
      println(keyValue._1 + "->"+keyValue._2.size)//> another->2
                                                  //| is->2
                                                  //| test1->4
                                                  //| here->2
                                                  //| test2->2
   }


}

这允许我在部署到实际 Hadoop 集群之前在 Windows 上的 Eclipse IDE 中运行我的 mapreduce 任务。我想为 Spark 执行类似的操作,或者能够在 Eclipse 中编写 Spark 代码以在部署到 Spark 集群之前进行测试。 Spark 可以做到这一点吗?由于 Spark 在 Hadoop 之上运行,这是否意味着如果不先安装 Hadoop,我就无法运行 Spark?所以换句话说,我可以只使用 Spark 库来运行代码吗? :

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
      List("target/scala-2.10/simple-project_2.10-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

取自https://spark.apache.org/docs/0.9.0/quick-start.html#a-standalone-app-in-scala

如果需要,我需要在项目中包含哪些 Spark 库?

【问题讨论】:

  • spark.apache.org 表示 spark-core_2.10,版本 0.9.0-incubating。我将从它的依赖项开始。你可以找到herehere。如果您没有使用依赖项管理插件创建项目来创建 Eclipse 项目,则必须自己下载依赖项。一个似乎是 hadoop-client。

标签: java eclipse scala hadoop apache-spark


【解决方案1】:

将以下内容添加到您的 build.sbt libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1" 并确保您的scalaVersion 已设置(例如scalaVersion := "2.10.3"

另外,如果你只是在本地运行程序,你可以跳过 SparkContext 的最后两个参数,如下val sc = new SparkContext("local", "Simple App")

最后,Spark 可以在 Hadoop 上运行,但也可以在独立模式下运行。见:https://spark.apache.org/docs/0.9.1/spark-standalone.html

【讨论】:

  • 您还需要使用 sbteclipse 重新生成 Eclipse 项目,并可能在 Eclipse 中刷新项目。
  • 这可以直接工作吗?不需要打包jar提交给spark-submit吗?
猜你喜欢
  • 2012-12-22
  • 2011-04-28
  • 2019-05-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-01-25
  • 2015-06-19
  • 1970-01-01
相关资源
最近更新 更多