【问题标题】:spark/scala string to json inside mapspark/scala 字符串到地图内的 json
【发布时间】:2017-04-17 16:10:28
【问题描述】:

我有一个看起来像这样的 pairRDD

(1, {"id":1, "picture": "url1"})
(2, {"id":2, "picture": "url2"})
(3, {"id":3, "picture": "url3"})
...

第二个元素是一个字符串,我从http://alvinalexander.com/scala/how-to-write-scala-http-get-request-client-source-fromurl 的函数get() 中得到。这是那个函数:

@throws(classOf[java.io.IOException])
@throws(classOf[java.net.SocketTimeoutException])
def get(url: String,
        connectTimeout: Int = 5000,
        readTimeout: Int = 5000,
        requestMethod: String = "GET") =
{
    import java.net.{URL, HttpURLConnection}
    val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection]
    connection.setConnectTimeout(connectTimeout)
    connection.setReadTimeout(readTimeout)
    connection.setRequestMethod(requestMethod)
    val inputStream = connection.getInputStream
    val content = io.Source.fromInputStream(inputStream).mkString
    if (inputStream != null) inputStream.close
    content
}

现在我想将该字符串转换为 json 以从中获取图片 url。 (来自https://stackoverflow.com/a/38271732/1456026

val step2 = pairRDD_1.map({case(x,y)=>{
val jsonStr = y
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
(x,y("picture"))
}})

但我不断得到

线程“主”org.apache.spark.SparkException 中的异常:任务不是 可序列化

当我打印出前 20 个元素并尝试在 .map 之外手动将字符串转换为 json 时,它起作用了。

val rdd = sc.parallelize(Seq("""{"id":1, "picture": "url1"}"""))
val df = sqlContext.read.json(rdd)
println(df)
>>>[id: string, picture: string]

如何在 .map 内的 spark/scala 中将字符串转换为 json?

【问题讨论】:

    标签: json string scala apache-spark


    【解决方案1】:

    通常当您看到此消息时,这是因为您在 map 函数(读取匿名函数)中使用了在其外部定义且无法序列化的资源。

    在集群模式下运行,匿名函数将完全在不同的机器上运行。在这台单独的机器上,您的应用程序的一个新实例被实例化,它的状态(变量/值/等)由驱动程序序列化并发送到新实例的数据设置。如果您的匿名函数是一个闭包(即利用其范围之外的变量),那么这些资源必须是可序列化的,以便发送到工作节点。

    例如,map 函数可能会尝试使用数据库连接来获取 RDD 中每条记录的一些信息。该数据库连接仅在创建它的主机上有效(当然,从网络的角度来看),这通常是驱动程序,因此它不能从不同的主机序列化、发送和使用。在此特定示例中,您将执行 mapPartitions() 以实例化来自 worker 本身的数据库连接,然后 map() 该分区中的每个记录以查询数据库。

    如果没有您的完整代码示例,我无法提供更多帮助,看看哪些潜在值或变量无法序列化。

    【讨论】:

      【解决方案2】:

      您不能在分布式操作中使用SparkContext。在上面的代码中,您无法在pairRDD_1 上的map 操作中访问 SparkContext。

      考虑使用 JSON 库来执行转换。

      【讨论】:

        【解决方案3】:

        答案之一是使用 json4s 库。 来源:http://muster.json4s.org/docs/jawn_codec.html

        //case class defined outside main()
        case class Pictures(id: String, picture: String)
        
        // import library
        import muster._
        import muster.codec.jawn._
        
        // here all the magic happens 
        val json_read_RDD = pairRDD_1.map({case(x,y) =>
              {
                  val json_read_to_case_class = JawnCodec.as[Pictures](y)
                  (x, json_read_to_case_class.picture)
            }})
        
        // add to build.sbt
        libraryDependencies ++= Seq(
        "org.json4s" %% "muster-codec-json" % "0.3.0",
        "org.json4s" %% "muster-codec-jawn" % "0.3.0")
        

        感谢 Travis Hegner,他解释了为什么原始代码不起作用 并向 Anton Okolnychyi 寻求使用 json 库的建议。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2018-08-18
          • 2023-02-26
          • 2016-03-09
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-05-20
          相关资源
          最近更新 更多