【发布时间】:2014-05-03 13:33:27
【问题描述】:
为了在将任务实际部署到 Hadoop 之前简化在 Hadoop 上运行的 map reduce 任务的开发,我使用我编写的一个简单的 map reducer 进行了测试:
object mapreduce {
import scala.collection.JavaConversions._
val intermediate = new java.util.HashMap[String, java.util.List[Int]]
//> intermediate : java.util.HashMap[String,java.util.List[Int]] = {}
val result = new java.util.ArrayList[Int] //> result : java.util.ArrayList[Int] = []
def emitIntermediate(key: String, value: Int) {
if (!intermediate.containsKey(key)) {
intermediate.put(key, new java.util.ArrayList)
}
intermediate.get(key).add(value)
} //> emitIntermediate: (key: String, value: Int)Unit
def emit(value: Int) {
println("value is " + value)
result.add(value)
} //> emit: (value: Int)Unit
def execute(data: java.util.List[String], mapper: String => Unit, reducer: (String, java.util.List[Int]) => Unit) {
for (line <- data) {
mapper(line)
}
for (keyVal <- intermediate) {
reducer(keyVal._1, intermediate.get(keyVal._1))
}
for (item <- result) {
println(item)
}
} //> execute: (data: java.util.List[String], mapper: String => Unit, reducer: (St
//| ring, java.util.List[Int]) => Unit)Unit
def mapper(record: String) {
var jsonAttributes = com.nebhale.jsonpath.JsonPath.read("$", record, classOf[java.util.ArrayList[String]])
println("jsonAttributes are " + jsonAttributes)
var key = jsonAttributes.get(0)
var value = jsonAttributes.get(1)
println("key is " + key)
var delims = "[ ]+";
var words = value.split(delims);
for (w <- words) {
emitIntermediate(w, 1)
}
} //> mapper: (record: String)Unit
def reducer(key: String, listOfValues: java.util.List[Int]) = {
var total = 0
for (value <- listOfValues) {
total += value;
}
emit(total)
} //> reducer: (key: String, listOfValues: java.util.List[Int])Unit
var dataToProcess = new java.util.ArrayList[String]
//> dataToProcess : java.util.ArrayList[String] = []
dataToProcess.add("[\"test1\" , \"test1 here is another test1 test1 \"]")
//> res0: Boolean = true
dataToProcess.add("[\"test2\" , \"test2 here is another test2 test1 \"]")
//> res1: Boolean = true
execute(dataToProcess, mapper, reducer) //> jsonAttributes are [test1, test1 here is another test1 test1 ]
//| key is test1
//| jsonAttributes are [test2, test2 here is another test2 test1 ]
//| key is test2
//| value is 2
//| value is 2
//| value is 4
//| value is 2
//| value is 2
//| 2
//| 2
//| 4
//| 2
//| 2
for (keyValue <- intermediate) {
println(keyValue._1 + "->"+keyValue._2.size)//> another->2
//| is->2
//| test1->4
//| here->2
//| test2->2
}
}
这允许我在部署到实际 Hadoop 集群之前在 Windows 上的 Eclipse IDE 中运行我的 mapreduce 任务。我想为 Spark 执行类似的操作,或者能够在 Eclipse 中编写 Spark 代码以在部署到 Spark 集群之前进行测试。 Spark 可以做到这一点吗?由于 Spark 在 Hadoop 之上运行,这是否意味着如果不先安装 Hadoop,我就无法运行 Spark?所以换句话说,我可以只使用 Spark 库来运行代码吗? :
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SimpleApp {
def main(args: Array[String]) {
val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
List("target/scala-2.10/simple-project_2.10-1.0.jar"))
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
取自https://spark.apache.org/docs/0.9.0/quick-start.html#a-standalone-app-in-scala
如果需要,我需要在项目中包含哪些 Spark 库?
【问题讨论】:
-
spark.apache.org 表示 spark-core_2.10,版本 0.9.0-incubating。我将从它的依赖项开始。你可以找到here 或here。如果您没有使用依赖项管理插件创建项目来创建 Eclipse 项目,则必须自己下载依赖项。一个似乎是 hadoop-client。
标签: java eclipse scala hadoop apache-spark