【问题标题】:Selecting particular column using Spark使用 Spark 选择特定列
【发布时间】:2016-08-03 18:52:10
【问题描述】:

我在 hdfs 中有一个以逗号 (,) 分隔的文件,我正在尝试使用 scala 提取第 6 列,因为我在下面的代码中编写了该文件

object WordCount {
 def main(args: Array[String])
 {
 val textfile = sc.textFile("/user/cloudera/xxx/xxx")
 val word = textfile.filter( x => x.length >  0 ).map(_.replaceAll("\\|",",").trim)
 val keys = word.map(a => a(5))
 keys.saveAsTextFile("/user/cloudera/xxx/Sparktest")
 }
}

但我在 HDFS 中得到的结果不是我想要的。

以前我的数据是:

MSH|^~\&|RQ|BIN|SMS|BIN|2019||ORU^R01|120330003918|J|2.2
PID|1|xxxxx|xxxx||TEST|Rooney|19761202|M|MR^^M^MR^MD^11|7|0371 HOES LANE^0371

现在我的数据是:

\
T
I
,
1
N
\
T
I
,
1
N
\
T
I

我希望我的结果是:

BIN
TEST 

我不知道我做错了什么。请帮忙

【问题讨论】:

  • 我为什么要投反对票?你能解释一下,以便我更正吗

标签: scala hadoop apache-spark


【解决方案1】:

您将 | 替换为 ,,但您没有用逗号拆分,所以 word 仍然具有 RDD[String] 类型,而不是您看起来的 RDD[Array[String]]期待。然后,a => a(5) 将每个字符串视为 chars 的数组,从而得到您所看到的结果。

不知道为什么首先要用逗号替换管道,您可以:

val word = textfile.filter(x => x.length >  0).map(_.split('|'))
val keys = word.map(a => a(5).trim)

【讨论】:

  • 谢谢!这解决了我的问题。我有两个疑问,我想知道何时在 scala 中使用 split() 以及如何提取多个列,例如;如果我想要 BIN 2019 作为我的结果,我可以使用 val keys = word.map(a => a(5)(6).trim) 吗?
  • 你可以在这里阅读splitalvinalexander.com/scala/scala-split-string-example;要获得由空格分隔的两列,您可以使用字符串插值:word.map(a => s"${a(5)} ${a(6)}").
  • 非常感谢您的帮助。
【解决方案2】:

使用'split()'函数!

val s="MSH|^~\\&|RQ|BIN|SMS|BIN|2019||ORU^R01|120330003918|J|2.2"

// WRONG
s.replaceAll("\\|",",")(5)   
res3: Char = ~

// RIGHT
s.split("\\|")(5) 
res4: String = BIN

【讨论】:

  • 如果我想提取多个列,比如我想要 BIN 2019 作为我的结果,我该如何实现?
  • @Captcha : s.split("\\|") 返回一个数组。如果你需要像 BIN 2019 这样的东西,我会在上面提到的代码中说 s.split("\\|").map(tuple => (tuple(5),(tuple(6))
  • @Rakshith 我按照你说的现在我得到的结果是(BIN,20121009151949)我怎样才能删除括号“()”
  • @Captcha :所以基本上返回类型是一个 tuple 。所以如果你说类似 val tokenised :(String,String)= s.split("\\|").map(tuple => (tuple(5),tuple(6)) 。所以 tokenised._1 会返回给你BIN 和 tokenised._2 将返回 2019。如果你希望它们成为一个字符串,那么你可以连接 tokenised._1 和 tokenised._2
【解决方案3】:

在 spark 2.0 中,您现在有了 csv 阅读器,因此您可以按如下方式简单地加载 csv

val baseDS=spark.read.option("header","true").csv(filePath)
  baseDS.show()

您可以通过简单的名称来选择列,如下所示

val selectCol = baseDS.select(ColumnName)

【讨论】:

    猜你喜欢
    • 2019-01-12
    • 1970-01-01
    • 1970-01-01
    • 2023-02-26
    • 1970-01-01
    • 2017-09-24
    • 1970-01-01
    • 2019-05-20
    • 2019-05-01
    相关资源
    最近更新 更多