【发布时间】:2017-04-17 16:10:28
【问题描述】:
我有一个看起来像这样的 pairRDD
(1, {"id":1, "picture": "url1"})
(2, {"id":2, "picture": "url2"})
(3, {"id":3, "picture": "url3"})
...
第二个元素是一个字符串,我从http://alvinalexander.com/scala/how-to-write-scala-http-get-request-client-source-fromurl 的函数get() 中得到。这是那个函数:
@throws(classOf[java.io.IOException])
@throws(classOf[java.net.SocketTimeoutException])
def get(url: String,
connectTimeout: Int = 5000,
readTimeout: Int = 5000,
requestMethod: String = "GET") =
{
import java.net.{URL, HttpURLConnection}
val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection]
connection.setConnectTimeout(connectTimeout)
connection.setReadTimeout(readTimeout)
connection.setRequestMethod(requestMethod)
val inputStream = connection.getInputStream
val content = io.Source.fromInputStream(inputStream).mkString
if (inputStream != null) inputStream.close
content
}
现在我想将该字符串转换为 json 以从中获取图片 url。 (来自https://stackoverflow.com/a/38271732/1456026)
val step2 = pairRDD_1.map({case(x,y)=>{
val jsonStr = y
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
(x,y("picture"))
}})
但我不断得到
线程“主”org.apache.spark.SparkException 中的异常:任务不是 可序列化
当我打印出前 20 个元素并尝试在 .map 之外手动将字符串转换为 json 时,它起作用了。
val rdd = sc.parallelize(Seq("""{"id":1, "picture": "url1"}"""))
val df = sqlContext.read.json(rdd)
println(df)
>>>[id: string, picture: string]
如何在 .map 内的 spark/scala 中将字符串转换为 json?
【问题讨论】:
标签: json string scala apache-spark