【问题标题】:scala Hashset contains duplicatesscala Hashset 包含重复项
【发布时间】:2018-10-02 13:38:45
【问题描述】:

我有一个可变哈希集,我在其中添加来自文本文件的元素,但也添加了重复项。 我的代码是

val machines = new mutable.HashSet[String]

val splitRdd = textFile.flatMap(line => {
val l =line.split("\\t").toList
machines.add(l(2))
machines})

splitRdd.foreach(rdd=> println(rdd))
splitRdd.saveAsTextFile(outputFile)

文本文件是:

0   0   m2  0   0   0  
0   0   m2  0   0   0  
0   0   m3  0   0   0  
0   0   m3  0   0   0  

我在 spark 的输出文件和标准输出中得到的是:

m2  
m2  
m2  
m3  
m3

除了添加了重复项之外,文件中还存在一个不存在的“m2”。我认为我的代码是正确的,有人知道为什么会这样吗?

【问题讨论】:

  • 什么是机器?你能更新代码吗?
  • machines是m2、m3相加的hashset
  • 但是输出的是splitRdd.foreach(rdd=> println(rdd)),不是哈希集,而是RDD的元素。您保存在文本文件中的是splitRdd.saveAsTextFile(outputFile),而不是哈希集。那么你怎么知道哈希集有重复呢?
  • 新的rdd不是和hashset一样吗?我想通过退回它,它会是一样的。另外,我正在尝试打印机器,但没有打印任何内容

标签: scala apache-spark duplicates hashset


【解决方案1】:

好吧,您似乎跳过了Spark Programming Guide(简短但强烈推荐的讲座)。特别是称为Understanding closures的部分:

关于 Spark 的一个难点是在跨集群执行代码时了解变量和方法的范围和生命周期。修改其范围之外的变量的 RDD 操作可能是一个常见的混淆源。

长话短说,Spark 中没有共享内存(不要将共享内存与所谓的shared variable 混淆,它们实际上并不是共享的)。您的代码的作用如下:

  • 它检测到machines 在闭包内被使用。
  • 将值序列化,发送到每个执行器节点,反序列化创建本地副本
  • 然后您的代码会修改这个本地副本,每个执行线程独立于其他线程工作。不涉及同步,因此您只需对各个分区进行重复数据删除。

您的代码可能local 模式下工作(取决于声明 machines 的确切上下文),但通常您应该看到驱动程序副本永远不会被修改,并且执行代码后为空。

你在这里真正应该做的是:

textFile.flatMap(_.split("\\t")).distinct.saveAsTextFile(outputFile)

这样,Spark 将对单个分区进行重复数据删除,对数据进行洗牌以确保剩余的副本驻留在相同的分区上,然后复制分区。

这里的外卖信息只是阅读手册。以后省去很多麻烦。

【讨论】:

  • 嗯,我已经读过了,但看起来,我错过了很多东西。我会再读一遍。谢谢!
猜你喜欢
  • 1970-01-01
  • 2013-04-20
  • 2022-07-05
  • 1970-01-01
  • 2023-01-17
  • 1970-01-01
  • 1970-01-01
  • 2011-07-27
相关资源
最近更新 更多