【发布时间】:2016-12-17 00:33:10
【问题描述】:
环境:Spark 1.6,Scala
通过 API 调用,我在 Scala 中使用 curl 将 JSON 数据存储在变量“rawdata”中。将变量的内容保存为 HDFS 中的文件。现在我想从变量(原始数据)数据解析和创建表,因为它已经包含作为字符串的数据(而不是从 HDFS 读取保存的文件)。我尝试了以下方法,但出现错误。我正在使用调度程序以 30 秒的间隔获取更新的记录。有没有更好的方法来实现我的目标?
object ConnTest extends App {
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Spark Ingestion").setMaster("local[*]"))
val hivecontext = new HiveContext(sc)
var run_id = java.time.LocalDate.now
val format = new SimpleDateFormat("YYYYMMddHHmmss")
val actorSystem = ActorSystem()
val scheduler = actorSystem.scheduler
val task = new Runnable {
def run() {
val dt = format.format(Calendar.getInstance().getTime())
println(dt)
val writeFilePath="vlinkAlarm_" + dt+".json"
val rdd1=sc.parallelize(jsonWriter(writeFilePath))**// ERROR No Spark Context**
rdd1.foreach(println)
}
}
implicit val executor = actorSystem.dispatcher
scheduler.schedule(
initialDelay = Duration(5, TimeUnit.SECONDS),
interval = Duration(30, TimeUnit.SECONDS),
runnable = task)
val uri="hdfs://quickstart.cloudera:8020"
val conf1 = new Configuration()
conf.set("fs.defaultFS", uri)
val fs = FileSystem.get(conf1)
def jsonWriter(fileName: String): String = {
println(fileName)
val rawdata = "curl http://services.groupkt.com/state/get/USA/all"!!
rawdata
}
}
====错误===
16/12/13 15:53:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
20161213155329
vlinkAlarm_20161213155329.json
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 44013 0 44013 0 0 68060 0 --:--:-- --:--:-- --:--:-- 68026
[ERROR] [12/13/2016 15:53:30.532] [default-akka.actor.default-dispatcher-2] [TaskInvocation] Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
一段时间后
The currently active SparkContext was created at:
(No active SparkContext.)
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
我的 Json 数据看起来像
{
"results": [{
"id": "6475867",
"date": "2016-12-09",
"time": "16:50:49",
"varbinds": ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
}, {
"id": "6475866",
"date": "2016-12-09",
"time": "16:50:05",
"varbinds": ["4", "192.255.54.154:1136", "CASAH 4", "52", "6", "", "", "", "", "", "", "", "", "", "", ""]
}]
}
谢谢
侯赛因
【问题讨论】:
标签: json scala apache-spark hdfs scheduled-tasks