【问题标题】:How to construct (key, value) list from parallelized list in scala spark?如何从scala spark中的并行列表构造(键,值)列表?
【发布时间】:2020-11-24 10:13:52
【问题描述】:

我是 Scala 的新手,我对它的工作原理有一些疑问。 我想做下一件事:给定值列表,我想并行构造一些模仿字典,例如:(1,2,3,4) -> ((1,1), (2,2), (3,3), (4,4) )。我知道如果我们处理并行集合,我们应该使用累加器。所以这是我的尝试:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable.ListBuffer

class DictAccumulatorV2 extends AccumulatorV2[Int, ListBuffer[(Int, Int)]] {
  private var dict:ListBuffer[(Int, Int)]= new ListBuffer[(Int, Int)]

  def reset(): Unit = {
    dict.clear()
  }

  def add(v: Int): Unit = {
    dict.append((v, v))
  }
  def value():ListBuffer[(Int, Int)] = {
    return dict
  }
  def isZero(): Boolean = {
    return dict.isEmpty
  }
  def copy() : AccumulatorV2[Int, ListBuffer[(Int, Int)]] = {
    // I do not understand how to code it correctly
    return new DictAccumulatorV2
  }
  def merge(other:AccumulatorV2[Int, ListBuffer[(Int, Int)]]): Unit = {
    // I do not understand how to code it correctly without reinitializing dict from val to var
    dict = dict ++ other.value
  }
}
object FirstSparkApplication {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MyFirstApp").setMaster("local")
    val sc = new SparkContext(conf)

    val accum = new DictAccumulatorV2()
    sc.register(accum, "mydictacc")
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    var res = distData.map(x => accum.add(x))
    res.count()
    println(accum)
  }
}

所以我想知道我做对了还是有什么错误。

总的来说,我对sc.parallelize 的工作方式也有疑问。它实际上是在我的机器上并行化工作还是只是虚构的代码串?我应该在setMaster 中放什么而不是"local"?如何查看任务在哪些节点上执行?任务是同时在所有节点上执行还是有一定的顺序?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    (1,2,3,4) -> ((1,1), (2,2), (3,3), (4,4) )

    你可以在 Scala 中这样做

    val list = List(1,2,3,4)
    val dict = list.map(i => (i,i))
    

    Spark Accumulators 用作从 Spark 执行器到驱动程序的通信手段。

    如果您想并行执行上述操作,那么您可以从该列表中构造一个 RDD 并对其应用映射转换,如上所示。

    在 spark shell 中看起来像

    val list = List(1,2,3,4)
    val listRDD = sc.parallelize(list)
    val dictRDD = listRDD.map(i => (i,i))
    

    sc.parallelize 的工作原理 它使用您传递给函数的集合创建一个分布式数据集(Spark 术语中的 RDD)。 More information.

    它确实使您的工作并行化。 如果您将 Spark 作业提交到集群,那么您应该能够在运行 spark-submit 命令后看到 YARN 应用程序 ID 或 URL。您可以访问 YARN 应用程序 URL 并查看有多少执行程序正在处理该分布式数据集以及什么顺序它们是在其中进行的。

    我应该在 setMaster 中放什么而不是“本地”

    来自 Spark 文档 - 要连接的主 URL,例如“local”在本地运行一个线程,“local[4]”在本地运行 4 个内核,或“spark://master:7077”在 Spark 独立集群上运行。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-10-15
      • 2019-01-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多