【问题标题】:How to replace RDD type of [String] with values of RDD type [String, Int]如何用 RDD 类型 [String, Int] 的值替换 [String] 的 RDD 类型
【发布时间】:2016-04-14 16:23:45
【问题描述】:

对于最初问题中的混淆,我们深表歉意。这是可重现示例的问题:

我的 rdd 是 [String],我的 rdd 是 [String, Long]。我想根据第二个String 与第一个String 的匹配得到[Long] 的rdd。示例:

//Create RDD
val textFile = sc.parallelize(Array("Spark can also be used for compute intensive tasks",
      "This code estimates pi by throwing darts at a circle"))
// tokenize, result: RDD[(String)]
val words = textFile.flatMap(line => line.split(" "))
// create index of distinct words, result:  RDD[(String,Long)]
val indexWords = words.distinct().zipWithIndex()

因此,我想要一个带有单词索引的 RDD,而不是 "Spark can also be used for compute intensive tasks" 中的单词。

再次抱歉,谢谢

【问题讨论】:

  • 我建议你看看map-function。
  • 由于每个 RDD 中可以有很多数组,那么您将使用哪对数组来获得结果?还是基于索引?
  • @iboss 根据keysx 的匹配,结果rdd 将是yvalues
  • 我想你把我们都弄糊涂了——示例代码中的数组实际上应该是 RDD 吗?换句话说,第一个 RDD 类型是 RDD[Array[String]] 还是 RDD[String]?如果是后者,并且您只是在示例中用数组“替换”RDD - 请不要。如果是前者 - 请使用 RDD 创建一个具有预期结果的完整示例。
  • "paired rdd of Array[String, Long]" - 仍然没有意义。是RDD[(String, Long)](确实可以用作PairRDD)还是RDD[Array[(String, Long)]]?如果是后者,请举一个完整的例子。如果是前者 - 为什么在示例中将 RDD 替换为 Arrays?

标签: scala apache-spark rdd


【解决方案1】:

如果我的理解正确,您对Spark can also be used for compute intensive tasks 中也出现的作品索引感兴趣。

如果是这样 - 这里有两个输出相同但性能特征不同的版本:

val lookupWords: Seq[String] = "Spark can also be used for compute intensive tasks".split(" ")

// option 1 - use join:
val lookupWordsRdd: RDD[(String, String)] = sc.parallelize(lookupWords).keyBy(w => w)
val result1: RDD[Long] = indexWords.join(lookupWordsRdd).map { case (key, (index, _)) => index }

// option 2 - assuming list of lookup words is short, you can use a non-distributed version of it
val result2: RDD[Long] = indexWords.collect { case (key, index) if lookupWords.contains(key) => index }

第一个选项使用我们感兴趣的索引创建第二个 RDD,使用 keyBy 将其转换为 PairRDD(键 == 值!),joins 它与您的 indexWords RDD 然后映射来只获取索引。

第二个选项应该只在“有趣的词”列表不会太大的情况下使用 - 所以我们可以将它作为一个列表(而不是RDD),让 Spark 序列化它并发送到工人为每个任务使用。然后我们使用collect(f: PartialFunction[T, U]),它应用这个部分函数来一次获得一个“过滤器”和一个“地图”——如果单词存在于列表中,我们只返回一个值,如果存在——我们返回索引。

【讨论】:

  • 感谢您的回答。我将选项 1 应用于我的数据并得到了这种类型的 rdd:org.apache.spark.rdd.RDD[org.apache.spark.rdd.RDD[Long]]。我的数据集是RDD[String],由句子组成,所以我使用map 将您的代码传递给每个句子。为了制作join,我在map 内部使用sc.parallelize(x)。最后,我得到了一个嵌套的 RDD,但我不知道如何获取 String 类型。
  • 那行不通,你不能创建RDD[RDD[..]],并且你不能在RDD转换中使用SparkContext(例如map),不要甚至尝试一下。我真的不明白你想要做什么(每个不同的单词是否应该在每个句子中映射到它的索引?期望的结果类型是什么?)所以我不能建议替代方案,但通常只要你认为你需要“嵌套 RDD”实际上应该使用 RDD.joinRDD.cartesian
  • 您的权利:我希望将句子中的每个单词映射到每个句子中的索引。索引的收集已经完成,您在一个句子中展示了如何进行映射。现在,我想通过map 转换将它应用到rdd。所需的输出是rdd[string],每个句子都带有整数(而不是单词)。
  • 嗨,您能否更新您的答案以将您的解决方案应用于我的示例中的变量 textFile(即,在两个初始句子上)。
【解决方案2】:

我收到了 SPARK-5063 的错误并给出了this answer,我找到了解决问题的方法:

//broadcast `indexWords`
val bcIndexWords = sc.broadcast(indexWords.collectAsMap)
// select `value` of `indexWords` given `key`
val result = textFile.map{arr => arr.split(" ").map(elem => bcIndexWords.value(elem))}
result.first()
res373: Array[Long] = Array(3, 7, 14, 6, 17, 15, 0, 12)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-08-17
    • 1970-01-01
    • 2017-01-29
    • 2017-10-10
    • 1970-01-01
    • 2018-11-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多