【问题标题】:Parsing json in spark在火花中解析 json
【发布时间】:2017-05-18 06:17:28
【问题描述】:

我在 spark 作业中使用 json scala 库从本地驱动器解析 json:

val requestJson=JSON.parseFull(Source.fromFile("c:/data/request.json").mkString)
    val mainJson=requestJson.get.asInstanceOf[Map[String,Any]].get("Request").get.asInstanceOf[Map[String,Any]]
    val currency=mainJson.get("currency").get.asInstanceOf[String]

但是当我尝试通过指向 hdfs 文件位置来使用相同的解析器时它不起作用:

val requestJson=JSON.parseFull(Source.fromFile("hdfs://url/user/request.json").mkString)

并给我一个错误:

java.io.FileNotFoundException: hdfs:/localhost/user/request.json (No such file or directory)
  at java.io.FileInputStream.open0(Native Method)
  at java.io.FileInputStream.open(FileInputStream.java:195)
  at java.io.FileInputStream.<init>(FileInputStream.java:138)
  at scala.io.Source$.fromFile(Source.scala:91)
  at scala.io.Source$.fromFile(Source.scala:76)
  at scala.io.Source$.fromFile(Source.scala:54)
  ... 128 elided

如何使用 Json.parseFull 库从 hdfs 文件位置获取数据?

谢谢

【问题讨论】:

  • 您应该提供类似hdfs://cluster_name/path/to/file' 的`hdfs` 位置,或者简单地提供类似/path/to/file/ 的目录名称。请尝试让我知道我会相应地回答。
  • 是的,我尝试将 hdfs 路径提供给 Source.fromFile api,但不起作用
  • 你能发布错误日志吗?
  • java.io.FileNotFoundException: hdfs:/hdfsurl/user/request.json(没有这样的文件或目录)在 java.io.FileInputStream.open0(本机方法)在 java.io.FileInputStream。 open(FileInputStream.java:195) 在 java.io.FileInputStream.(FileInputStream.java:138) 在 scala.io.Source$.fromFile(Source.scala:91) 在 scala.io.Source$.fromFile (Source.scala:76) at scala.io.Source$.fromFile(Source.scala:54) ... 128 省略

标签: scala apache-spark apache-spark-sql apache-spark-2.0


【解决方案1】:

Spark 确实内置了对 JSON 文档解析的支持,这将在 spark-sql_${scala.version} jar 中提供。

在 Spark 2.0+ 中:

import org.apache.spark.sql.SparkSession 

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate

val df = spark.read.format("json").json("json/file/location/in/hdfs")

df.show()

使用df 对象,您可以对其执行所有支持的SQL 操作,并且它的数据处理将分布在节点之间requestJson 只会在单机上计算。

Maven 依赖项

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.0.0</version>
</dependency>

编辑:(根据从 hdfs 读取文件的评论)

val hdfs = org.apache.hadoop.fs.FileSystem.get(
             new java.net.URI("hdfs://ITS-Hadoop10:9000/"), 
             new org.apache.hadoop.conf.Configuration()
           )
val path=new Path("/user/zhc/"+x+"/")
val t=hdfs.listStatus(path)
val in =hdfs.open(t(0).getPath)
val reader = new BufferedReader(new InputStreamReader(in))
var l=reader.readLine()

code credits: from another SO question

Maven 依赖项:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.2</version> <!-- you can change this as per your hadoop version -->
</dependency>

【讨论】:

  • json 文件只有几个 kbs,所以我想避免在这种情况下使用数据帧并在 dirver 上而不是在所有工作人员上解析 json
  • 我们可以通过将代码更改为master("local[1]") 来限制工人为一名。当您在local 模式下运行时,工作人员和驱动程序将在同一台机器上。
  • 您可以使用df.collect() 将整个数据获取到驱动程序。
  • 这项工作是解析一个小文件,然后是一个更大的镶木地板,并将更大的文件与较小的文件连接起来,较小的文件不需要运行我的意思的分布式代码。master 仍然应该在所有文件上执行内核可用,因为它必须处理大型镶木地板文件
  • 然后您必须使用 HDFS api 加载 json 文件作为位于 hdfs 中的文件。加载数据并收集到驱动程序很好。由于文件很小,它只会创建一个分区。
【解决方案2】:

在 spark 2.0 中要容易得多

val df = spark.read.json("json/file/location/in/hdfs")
df.show()

【讨论】:

  • 它为此生成了一个 map reduce 作业。对于一个小的 json,它是一个矫枉过正,因此我想用 scala 执行它
【解决方案3】:

可以在 Spark 中使用以下命令从 HDFS 读取文件: val jsonText = sc.textFile("hdfs://url/user/request.json").collect.mkString("\n")

【讨论】:

    猜你喜欢
    • 2014-10-27
    • 2015-01-08
    • 2016-08-13
    • 2020-07-29
    • 2022-01-08
    • 2019-04-10
    • 1970-01-01
    • 2021-11-25
    • 2016-07-09
    相关资源
    最近更新 更多