【问题标题】:How to fill Scala Seq of Sets with unique values from Spark RDD?如何用 Spark RDD 中的唯一值填充 Scala Seq of Sets?
【发布时间】:2016-10-14 10:37:34
【问题描述】:

我正在使用 Spark 和 Scala。我有一个 Array[String] 的 RDD,我将对其进行迭代。 RDD 包含(name, age, work, ...) 等属性的值。我正在使用可变字符串集序列(称为attributes)来收集每个属性的所有唯一值。

把RDD想象成这样:

("name1","21","JobA")
("name2","21","JobB")
("name3","22","JobA")

最后我想要这样的东西:

attributes = (("name1","name2","name3"),("21","22"),("JobA","JobB"))

我有以下代码:

val someLength = 10
val attributes = Seq.fill[mutable.Set[String]](someLength)(mutable.Set())
val splitLines = rdd.map(line => line.split("\t"))

lines.foreach(line => {
  for {(value, index) <- line.zipWithIndex} {
    attributes(index).add(value)
    // #1
  }
})

// #2

当我调试并停在标有 #1 的行时,一切都很好,attributes 正确填充了唯一值。

但是在循环之后,在第 #2 行,attributes 再次为空。调查显示,该属性是一系列集合,大小均为 0。

Seq()
Seq()
...

我做错了什么?是否有某种我不知道的范围界定?

【问题讨论】:

  • 出于专业好奇心提出的一个问题:如果您现在正在学习 Spark,为什么要使用 RDD API?为什么不是数据集/数据框?
  • @maasg 我不是 OP,但如果我刚开始使用 Spark,我可能会从较低的抽象级别开始,然后逐步升级到 DataSets/Frames。尽管从技术上讲,这可能不会“提高”抽象级别,但仍有一些事情是您无法做到的,除非您采用原始 RDD。
  • @maasg 我同意 Yuval。我觉得与 DataSets/DataFrames 相比,使用 RDD 学习 Spark 更容易且信息量更大。

标签: scala apache-spark set seq


【解决方案1】:

答案在于 Spark 是一个分布式引擎。我会给你一个关于你所面临的问题的粗略想法。这里每个RDD 中的元素都被分桶到Partitions 中,每个Partition 都可能存在于不同的节点上。

当您编写rdd1.foreach(f) 时,f 被包裹在一个闭包中(它会获取相应对象的副本)。现在,这个闭包被序列化,然后被发送到每个节点,它被应用于Partition 中的每个元素。

在这里,您的f 将在其包装的闭包中获得attributescopy,因此当执行f 时,它会与attributes 的副本交互,而不是与您想要的attributes 交互.这会导致您的attributes 被忽略而没有任何更改。

我希望问题现在已经清楚了。

val yourRdd = sc.parallelize(List(
    ("name1","21","JobA"),
    ("name2","21","JobB"),
    ("name3","22","JobA")
))

val yourNeededRdd = yourRdd
  .flatMap({ case (name, age, work) => List(("name", name), ("age", age), ("work", work)) })
  .groupBy({ case (attrName, attrVal) => attrName })
  .map({ case (attrName, group) => (attrName, group.toList.map(_._2).distinct })

// RDD(
//     ("name", List("name1", "name2", "name3")),
//     ("age", List("21", "22")),
//     ("work", List("JobA", "JobB"))
// )

// Or

val distinctNamesRdd = yourRdd.map(_._1).distinct
// RDD("name1", "name2", "name3")

val distinctAgesRdd = yourRdd.map(_._2).distinct
// RDD("21", "22")

val distinctWorksRdd = yourRdd.map(_._3).distinct
// RDD("JobA", "JobB")

【讨论】:

  • 感谢您的解释,我还在学习 Spark,所以这对您有很大帮助。我的问题现在很清楚,但我还没有解决方案。我可能会尝试仅通过 Spark 转换/操作来实现我想要的结果。
  • 感谢您的解决方案。对于可变数量的属性,这变得更加复杂,特别是因为我最初想从文件中读取属性名称而不是硬编码它们。
猜你喜欢
  • 2021-10-25
  • 2019-01-17
  • 2015-02-27
  • 2016-05-29
  • 1970-01-01
  • 2021-02-03
  • 2016-03-13
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多