【发布时间】:2020-11-24 10:13:52
【问题描述】:
我是 Scala 的新手,我对它的工作原理有一些疑问。
我想做下一件事:给定值列表,我想并行构造一些模仿字典,例如:(1,2,3,4) -> ((1,1), (2,2), (3,3), (4,4) )。我知道如果我们处理并行集合,我们应该使用累加器。所以这是我的尝试:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable.ListBuffer
class DictAccumulatorV2 extends AccumulatorV2[Int, ListBuffer[(Int, Int)]] {
private var dict:ListBuffer[(Int, Int)]= new ListBuffer[(Int, Int)]
def reset(): Unit = {
dict.clear()
}
def add(v: Int): Unit = {
dict.append((v, v))
}
def value():ListBuffer[(Int, Int)] = {
return dict
}
def isZero(): Boolean = {
return dict.isEmpty
}
def copy() : AccumulatorV2[Int, ListBuffer[(Int, Int)]] = {
// I do not understand how to code it correctly
return new DictAccumulatorV2
}
def merge(other:AccumulatorV2[Int, ListBuffer[(Int, Int)]]): Unit = {
// I do not understand how to code it correctly without reinitializing dict from val to var
dict = dict ++ other.value
}
}
object FirstSparkApplication {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MyFirstApp").setMaster("local")
val sc = new SparkContext(conf)
val accum = new DictAccumulatorV2()
sc.register(accum, "mydictacc")
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
var res = distData.map(x => accum.add(x))
res.count()
println(accum)
}
}
所以我想知道我做对了还是有什么错误。
总的来说,我对sc.parallelize 的工作方式也有疑问。它实际上是在我的机器上并行化工作还是只是虚构的代码串?我应该在setMaster 中放什么而不是"local"?如何查看任务在哪些节点上执行?任务是同时在所有节点上执行还是有一定的顺序?
【问题讨论】:
标签: scala apache-spark