【发布时间】:2020-06-21 23:52:17
【问题描述】:
我正在尝试根据文本文件中的 JSON 记录创建动态架构,因为每条记录都有不同的架构。以下是我的代码。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json, col}
object streamingexample {
def main(args: Array[String]): Unit = {
val spark:SparkSession = SparkSession.builder()
.master("local[*]")
.appName("SparkByExamples")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val df1 = spark.readStream.textFile("C:\\Users\\sheol\\Desktop\\streaming")
val newdf11=df1
val json_schema = newdf11.select("value").collect().map(x => x.get(0)).mkString(",")
val df2 = df1.select(from_json($"value", schema_of_json(json_schema)).alias("value_new"))
val df3 = df2.select($"value_new.*")
df3.printSchema()
df3.writeStream
.option("truncate", "false")
.format("console")
.start()
.awaitTermination()
}
}
我收到以下错误。请帮助如何修复代码。我尝试了很多。想不通。
Error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
样本数据:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
【问题讨论】:
标签: json schema spark-streaming