【发布时间】:2018-10-27 21:21:28
【问题描述】:
我是 Spark 流媒体的新手。我正在尝试使用本地 csv 文件进行结构化 Spark 流式传输。我在处理时遇到以下异常。
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[file:///home/Teju/Desktop/SparkInputFiles/*.csv]
这是我的代码。
val df = spark
.readStream
.format("csv")
.option("header", "false") // Use first line of all files as header
.option("delimiter", ":") // Specifying the delimiter of the input file
.schema(inputdata_schema) // Specifying the schema for the input file
.load("file:///home/Teju/Desktop/SparkInputFiles/*.csv")
val filterop = spark.sql("select tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID,first(rssi_weightage(RSSI)) as RSSI_Weight from my_table where RSSI > -127 group by tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID order by Timestamp ASC")
val outStream = filterop.writeStream.outputMode("complete").format("console").start()
我创建了 cron 作业,因此每 5 分钟我将获得一个输入 csv 文件。我正在尝试解析 Spark 流。
【问题讨论】:
-
df与其他数据集filterop和outStream之间有什么关系?您不要在粘贴的代码中使用df。这是故意的吗?我会说代码不能按原样执行。缺少一些重要的东西。
标签: scala apache-spark spark-structured-streaming