【问题标题】:How do I return multiple key-value pairs in Scala using Spark's map transformation?如何使用 Spark 的映射转换在 Scala 中返回多个键值对?
【发布时间】:2015-06-10 20:05:12
【问题描述】:

我是 Scala 和 Spark 的新手。我试图在地图转换期间返回多个键值对。我的输入数据是一个简单的 CSV 文件。

1、2、3 4、5、6 7、8、9

我的 Scala 脚本如下所示。

class Key(_i:Integer, _j:Integer) {
 def i = _i
 def j = _j
}
class Val(_x:Double, _y:Double) {
 def x = _x
 def y = _y
}
val arr = "1,2,3".split(",")
for(i <- 0 until arr.length) {
 val x = arr(i).toDouble
 for(j <- 0 until arr.length) {
  val y = arr(j).toDouble
  val k = new Key(i, j)
  val v = new Val(x, y)
  //note that i want to return the tuples, (k, v)
 }
}

我希望能够使用上面的 for 循环和数据结构来返回多个元组 (k, v)。类似于下面的代码。

val file = sc.textFile("/path/to/test.csv")
file.map(line => {
 val arr = line.split(",")
 for(i <- 0 until arr.length) {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) {
   val y = arr(j).toDouble
   val k = new Index(i,j)
   val v = new Val(x,y)
   (k,v)
  }
 }
}).collect //reduceByKey is not there, reduce is there, but not what i want

当我将上面的代码复制/粘贴到 lambda 表达式中(并在 Scala REPL shell 上运行)时,我收到以下错误:

错误:简单表达式的非法开始 val arr = line.split(",") ^

我也意识到我仍然停留在命令式/过程式编程思维中,所以请多多包涵(以及 Scala/Spark 的新手)。

【问题讨论】:

    标签: scala apache-spark scala-collections


    【解决方案1】:

    使用RDD.flatMapyield 来自for 循环的列表:

    val file = sc.textFile("/path/to/test.csv")
    file.flatMap { line =>
      val arr = line.split(",")
      for {
        i <- 0 until arr.length
        j <- (i + 1) until arr.length
      } yield {
        val x = arr(i).toDouble
        val y = arr(j).toDouble
        val k = new Index(i, j)
        val v = new Val(x, y)
        (k, v)
      }
    }.collect
    

    【讨论】:

    • Scala for 循环很神奇。我从来没有找到他们的文档,在这一点上我不敢问。
    【解决方案2】:

    您忘记了箭头后面的括号。只有当它是一个简单的表达式(一个表达式)时,您才能省略它们。

    file.map(line => {
        //multiple lines of code here
    })
    

    修改后的完整答案:

    case class Index(i:Integer, j:Integer)
    case class Val(x:Double, y:Double)
    
    val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
    data.flatMap(line=>{
    val arr = line.split(",")
     val doubleSeq = for(i <- 0 until arr.length) yield {
      val x = arr(i).toDouble
      for(j <- (i+1) until arr.length) yield {
       val y = arr(j).toDouble
       val k = Index(i,j)
       val v = Val(x,y)
       (k,v)
      }
     }
     doubleSeq.flatten
    })
    

    实际上有很多问题:

    • 请注意,我将您的类更改为案例类,因为它们是可序列化的。否则,您将需要实现Serializable
    • 我将map 更改为flatMap,以及flattened 你的数组作为一个flatMap 仍然会给你留下一个内部数组。现在,两者的结合将产生您的RDD[(Index, Val)],现在可以隐式地与reduceByKey 一起使用
    • 我使用yield 将您的for 循环转换为for 理解。你得到了Unit 的最终类型,因为for 循环的返回类型是Unit

    【讨论】:

    • 您的建议有所帮助。现在该错误消失了。但是当我添加 return 语句时,return (k,v),我得到以下信息:错误:返回方法定义之外。
    • 我没有看到...不要在scala中返回,最后的语句是返回值。我认为这会解决它
    • 你知道如何检查 lambda 函数是否正确吗?当我执行 file.map(line => {...}).collect 时,我看到的只是 Array[Unit] = Array((),(),...())。我接下来要做的是用同一个键减少所有值。但是,自动完成(点击选项卡)表明 reduceByKey 不是 org.apache.spark.rdd.RDD[Unit] 的成员。我仍然停留在 MapReduce 的心态中。
    • 我在您的帮助下发布了现在可以使用的代码。请注意,在上面的示例中,我使用 collect 来尝试检查该 RDD 中的实际内容。同时,我正在阅读这篇文章blog.cloudera.com/blog/2014/09/…,似乎表明Scala/Spark 中的map 函数有1 个输入和1 个输出,由于我想做的事情,我可能不得不使用flatMap 函数。
    • 是的,flatMap 似乎是正确的。与您的代码不完全相同,但这个问题stackoverflow.com/questions/29472603/… 也使用 flatMap 从每个输入行生成多个输出行。它可能会为您指明正确的方向?
    猜你喜欢
    • 2018-11-07
    • 2019-11-03
    • 1970-01-01
    • 2018-07-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-13
    • 1970-01-01
    相关资源
    最近更新 更多