【问题标题】:Distributed Map in Scala SparkScala Spark 中的分布式地图
【发布时间】:2014-07-13 16:24:53
【问题描述】:

Spark 是否支持分布式地图集合类型?

如果我有一个键值对的 HashMap[String,String] ,可以将其转换为分布式 Map 集合类型吗?要访问我可以使用“过滤器”的元素,但我怀疑它的性能和 Map 一样好?

【问题讨论】:

标签: scala apache-spark


【解决方案1】:

因为我发现了一些新信息,所以我想我会把我的 cmets 变成一个答案。 @maasg 已经涵盖了标准的lookup 函数我想指出你应该小心,因为如果RDD 的分区器是None,那么无论如何查找只会使用过滤器。参考 spark 顶部的 (K,V) 存储,看起来这正在进行中,但是已经提出了一个可用的拉取请求here。这是一个示例用法。

import org.apache.spark.rdd.IndexedRDD

// Create an RDD of key-value pairs with Long keys.
val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0)))
// Construct an IndexedRDD from the pairs, hash-partitioning and indexing
// the entries.
val indexed = IndexedRDD(rdd).cache()

// Perform a point update.
val indexed2 = indexed.put(1234L, 10873).cache()
// Perform a point lookup. Note that the original IndexedRDD remains
// unmodified.
indexed2.get(1234L) // => Some(10873)
indexed.get(1234L) // => Some(0)

// Efficiently join derived IndexedRDD with original.
val indexed3 = indexed.innerJoin(indexed2) { (id, a, b) => b }.filter(_._2 != 0)
indexed3.collect // => Array((1234L, 10873))

// Perform insertions and deletions.
val indexed4 = indexed2.put(-100L, 111).delete(Array(998L, 999L)).cache()
indexed2.get(-100L) // => None
indexed4.get(-100L) // => Some(111)
indexed2.get(999L) // => Some(0)
indexed4.get(999L) // => None

拉取请求似乎很受欢迎,并且可能会包含在未来版本的 spark 中,因此在您自己的代码中使用该拉取请求可能是安全的。这是JIRA ticket,以防你好奇

【讨论】:

  • 很好 :) 谢谢。只是好奇,但您是如何意识到这一点的? Spark 邮件列表?
  • @blue-sky 我在邮件列表中,但实际上我发现这一点的方式是浏览spark jira tickets
【解决方案2】:

快速回答:部分。

您可以将Map[A,B] 转换为RDD[(A,B)],方法是首先将映射强制转换为(k,v) 对的序列,但这样做您可以放松映射的键必须是集合的约束。 IE。你失去了Map 结构的语义。

从实际的角度来看,您仍然可以使用 kvRdd.lookup(element) 将元素解析为其对应的值,但结果将是一个序列,因为您无法保证如前所述存在单个查找值。

一个让事情变得清晰的 spark-shell 示例:

val englishNumbers = Map(1 -> "one", 2 ->"two" , 3 -> "three")
val englishNumbersRdd = sc.parallelize(englishNumbers.toSeq)

englishNumbersRdd.lookup(1)
res: Seq[String] = WrappedArray(one) 

val spanishNumbers = Map(1 -> "uno", 2 -> "dos", 3 -> "tres")
val spanishNumbersRdd = sc.parallelize(spanishNumbers.toList)

val bilingueNumbersRdd = englishNumbersRdd union spanishNumbersRdd

bilingueNumbersRdd.lookup(1)
res: Seq[String] = WrappedArray(one, uno)

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2020-08-09
  • 2018-04-17
  • 2021-01-19
  • 1970-01-01
  • 1970-01-01
  • 2017-04-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多