【问题标题】:Issue with parsing JSON data stored as string in Scala variable解析存储为 Scala 变量中的字符串的 JSON 数据的问题
【发布时间】:2016-12-17 00:33:10
【问题描述】:

环境:Spark 1.6,Scala

通过 API 调用,我在 Scala 中使用 curl 将 JSON 数据存储在变量“rawdata”中。将变量的内容保存为 HDFS 中的文件。现在我想从变量(原始数据)数据解析和创建表,因为它已经包含作为字符串的数据(而不是从 HDFS 读取保存的文件)。我尝试了以下方法,但出现错误。我正在使用调度程序以 30 秒的间隔获取更新的记录。有没有更好的方法来实现我的目标?

object ConnTest extends App {

  val conf = new SparkConf()
  val sc = new SparkContext(conf.setAppName("Spark Ingestion").setMaster("local[*]"))
  val hivecontext = new HiveContext(sc)

  var run_id = java.time.LocalDate.now
  val format = new SimpleDateFormat("YYYYMMddHHmmss")

  val actorSystem = ActorSystem()
  val scheduler = actorSystem.scheduler
  val task = new Runnable {
    def run() {
      val dt = format.format(Calendar.getInstance().getTime())
      println(dt)

      val writeFilePath="vlinkAlarm_" + dt+".json"
      val rdd1=sc.parallelize(jsonWriter(writeFilePath))**// ERROR No Spark Context**
      rdd1.foreach(println)
    }
  }

  implicit val executor = actorSystem.dispatcher

  scheduler.schedule(
    initialDelay = Duration(5, TimeUnit.SECONDS),
    interval = Duration(30, TimeUnit.SECONDS),
    runnable = task)


  val uri="hdfs://quickstart.cloudera:8020"
  val conf1 = new Configuration()
  conf.set("fs.defaultFS", uri)
  val fs = FileSystem.get(conf1)

  def jsonWriter(fileName: String): String = {
  println(fileName)
    val rawdata = "curl http://services.groupkt.com/state/get/USA/all"!!
rawdata
  }
  }

====错误===

16/12/13 15:53:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

20161213155329
vlinkAlarm_20161213155329.json
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 44013    0 44013    0     0  68060      0 --:--:-- --:--:-- --:--:-- 68026
[ERROR] [12/13/2016 15:53:30.532] [default-akka.actor.default-dispatcher-2] [TaskInvocation] Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

一段时间后

The currently active SparkContext was created at:

(No active SparkContext.)
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

我的 Json 数据看起来像

{
    "results": [{
        "id": "6475867",
        "date": "2016-12-09",
        "time": "16:50:49",
        "varbinds": ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
    }, {
        "id": "6475866",
        "date": "2016-12-09",
        "time": "16:50:05",
        "varbinds": ["4", "192.255.54.154:1136", "CASAH 4", "52", "6", "", "", "", "", "", "", "", "", "", "", ""]
    }]
}

谢谢
侯赛因

【问题讨论】:

    标签: json scala apache-spark hdfs scheduled-tasks


    【解决方案1】:

    我无法完全解决这个问题,但找到了解决方法。只是在循环中使用了线程而不是调度程序。使用以下代码解析存储为变量 rawdata 中的字符串的 Json。

    for (i<- 1 to 10){
    val url = "http://10.51.253.11:8082/vistalink/1/alarm.json?isVerbose=true"
        val rawdata = scala.io.Source.fromURL(url).mkString
    
    val RDDFromString = sc.parallelize(rawdata :: Nil)
        val DF = hivecontext.read.json(RDDFromString)
    
    DF.printSchema()
    Thread.sleep(1000*30)
    }
    

    谢谢
    侯赛因

    【讨论】:

      猜你喜欢
      • 2022-01-12
      • 1970-01-01
      • 2021-10-08
      • 2023-03-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多