【问题标题】:Extracting data from RDD in Scala/Spark从 Scala/Spark 中的 RDD 中提取数据
【发布时间】:2017-05-04 19:01:09
【问题描述】:

所以我有一个大型数据集,它是 stackoverflow 用户群的样本。该数据集中的一行如下:

<row Id="42" Reputation="11849" CreationDate="2008-08-01T13:00:11.640" DisplayName="Coincoin" LastAccessDate="2014-01-18T20:32:32.443" WebsiteUrl="" Location="Montreal, Canada" AboutMe="A guy with the attention span of a dead goldfish who has been having a blast in the industry for more than 10 years.&#xD;&#xA;&#xD;&#xA;Mostly specialized in game and graphics programming, from custom software 3D renderers to accelerated hardware pipeline programming." Views="648" UpVotes="337" DownVotes="40" Age="35" AccountId="33" />

我想从信誉中提取数字,在本例中为“11849”,从年龄中提取数字,在本例中为“35”,我想将它们作为浮点数。

该文件位于 HDFS 中,因此它采用 RDD 格式

 val linesWithAge = lines.filter(line => line.contains("Age="))    //This is filtering data which doesnt have age
    val repSplit = linesWithAge.flatMap(line => line.split("\"")) //Here I am trying to split the data where there is a "

所以当我用引号将其拆分时,声誉在索引 3 中,年龄在索引 23 中,但是我如何将它们分配给地图或变量,以便我可以将它们用作浮点数。 我还需要它为 RDD 上的每一行执行此操作。

编辑:

   val linesWithAge = lines.filter(line => line.contains("Age="))    //transformations from the original input data
   val repSplit = linesWithAge.flatMap(line => line.split("\""))
    val withIndex = repSplit.zipWithIndex
    val indexKey = withIndex.map{case (k,v) => (v,k)}
    val b = indexKey.lookup(3)
    println(b)

因此,如果向数组添加了一个索引,现在我已经成功地将它分配给一个变量,但我只能对 RDD 中的一个项目执行此操作,有谁知道我如何对所有项目执行此操作?

【问题讨论】:

  • 您正在寻找map 函数。快速搜索找到了这个例子:backtobazics.com/big-data/spark/apache-spark-map-example
  • 映射函数以什么函数为参数?我想为数据集中的每一行在 3 和 23 处收集索引。请你举个例子,因为我已经尝试过使用地图功能。
  • 一个函数,它接受一个数组并产生一个由两个数字组成的元组:f: Array[String] =&gt; (Int, Int) 也许你可以试试看?如果你还不知道怎么做,这里有很多学习资源。
  • 你能检查一下我所做的编辑吗@maasg
  • 你检查这些操作的结果了吗?你会得到什么样的结果?

标签: scala hadoop apache-spark


【解决方案1】:

我们要做的是将原始数据集中的每个元素(表示为 RDD)转换为包含 (Reputation, Age) 作为数值的元组。

一种可能的方法是使用字符串操作转换 RDD 的每个元素,以便提取元素“Age”和“Reputation”的值,如下所示:

// define a function to extract the value of an element, given the name
def findElement(src: Array[String], name:String):Option[String] = {
  for {
    entry <- src.find(_.startsWith(name))
    value <- entry.split("\"").lift(1)
  } yield value
}

然后我们使用该函数从每条记录中提取有趣的值:

val reputationByAge = lines.flatMap{line => 
    val elements = line.split(" ")
    for {
        age <- findElement(elements, "Age")
        rep <- findElement(elements, "Reputation")
    } yield (rep.toInt, age.toInt)
}

请注意,在执行此操作之前,我们不需要过滤“年龄”。如果我们处理没有“年龄”或“声誉”的记录,findElement 将返回None。此后for-comprehension 的结果将是None,并且记录将被flatMap 操作展平

解决这个问题的更好方法是意识到我们正在处理结构化的 XML 数据。 Scala 提供了对 XML 的内置支持,所以我们可以这样做:

import scala.xml.XML
import scala.xml.XML._

// help function to map Strings to Option where empty strings become None 
def emptyStrToNone(str:String):Option[String] = if (str.isEmpty) None else Some(str)

val xmlReputationByAge = lines.flatMap{line => 
    val record = XML.loadString(line)
    for {          
      rep <- emptyStrToNone((record \ "@Reputation").text)
      age <- emptyStrToNone((record \ "@Age").text)
    } yield (rep.toInt, age.toInt)
}

此方法依赖于 XML 记录的结构来提取正确的属性。和以前一样,我们使用Option 值和flatMap 的组合来删除不包含我们需要的所有信息的记录。

【讨论】:

    【解决方案2】:

    首先,您需要一个函数来提取行中给定键的值 (getValueForKeyAs[T]),然后执行以下操作:

    val rdd = linesWithAge.map(line => (getValueForKeyAs[Float](line,"Age"), getValueForKeyAs[Float](line,"Reputation")))
    

    这应该给你一个RDD[(Float,Float)]类型的rdd

    getValueForKeyAs 可以这样实现:

    def getValueForKeyAs[A](line:String, key:String) : A = {
        val res = line.split(key+"=")
        if(res.size==1) throw new RuntimeException(s"no value for key $key")
        val value = res(1).split("\"")(1)
        return value.asInstanceOf[A]
    }
    

    【讨论】:

    • 因为找不到密钥而使用RuntimeException 破坏程序?我不认为我会那样做。返回 Option 类型会是更好的选择。
    • 那么请随意提供您自己的答案。我认为在这种情况下我更喜欢 Exception 因为我知道我只处理包含所需键的行。如果键不存在,那么这是一个编程错误,这种情况下 RuntimeException 是合理的......
    • 如果键不存在,则数据不完整,这种情况一直在发生。
    猜你喜欢
    • 1970-01-01
    • 2021-02-03
    • 2017-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多