【问题标题】:Spark 1.5.1, Scala 2.10.5: how to expand an RDD[Array[String], Vector]Spark 1.5.1、Scala 2.10.5:如何扩展 RDD[Array[String], Vector]
【发布时间】:2016-02-06 06:11:49
【问题描述】:

我正在使用 Spark 1.5.1 和 Scala 2.10.5

对于 RDD 的每个元素,我都有一个 RDD[Array[String], Vector]

  • 我想把Array[String]中的每个String合并起来 用Vector创建一个元组(String, Vector),这一步将导致从初始RDD的每个元素创建几个元组

目标是通过构建一个元组的RDD来结束:RDD[(String, Vector)],这个RDD包含上一步创建的所有元组。

谢谢

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    你试过吗?

    // rdd: RDD[Array[String], Vector] - initial RDD
    val new_rdd = rdd
      .flatMap {
        case (array: Array[String], vec: Vector) => array.map(str => (str, vec))
      }
    

    玩具示例(我在 spark-shell 中运行):

    val rdd = sc.parallelize(Array((Array("foo", "bar"), 100), (Array("one", "two"), 200)))
    val new_rdd = rdd
      .map {
        case (array: Array[String], vec: Int) => array.map(str => (str, vec))
      }
      .flatMap(arr => arr)
    new_rdd.collect
    res14: Array[(String, Int)] = Array((foo,100), (bar,100), (one,200), (two,200))
    

    【讨论】:

      【解决方案2】:

      考虑一下:

      rdd.flatMap { case (arr, vec) => arr.map( (s) => (s, vec) ) }
      

      (第一个flatMap 可以让您获得RDD[(String, Vector)] 作为输出,而map 可以让您获得RDD[Array[(String, Vector)]]

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-08-17
        • 2021-09-28
        • 2018-03-03
        相关资源
        最近更新 更多