【问题标题】:spark streaming writestream issue火花流写入流问题
【发布时间】:2020-06-21 23:52:17
【问题描述】:

我正在尝试根据文本文件中的 JSON 记录创建动态架构,因为每条记录都有不同的架构。以下是我的代码。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json, col}

object streamingexample {
  def main(args: Array[String]): Unit = {
    val spark:SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("SparkByExamples")
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._
    val df1 = spark.readStream.textFile("C:\\Users\\sheol\\Desktop\\streaming")
    val newdf11=df1
    val json_schema = newdf11.select("value").collect().map(x => x.get(0)).mkString(",")
    val df2 = df1.select(from_json($"value", schema_of_json(json_schema)).alias("value_new"))
    val df3 = df2.select($"value_new.*")
    df3.printSchema()
    df3.writeStream
      .option("truncate", "false")
      .format("console")
      .start()
      .awaitTermination()
     }
}

我收到以下错误。请帮助如何修复代码。我尝试了很多。想不通。

Error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

样本数据:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

【问题讨论】:

    标签: json schema spark-streaming


    【解决方案1】:

    您的代码中的这条语句会导致您的代码出现问题,正如您已经知道的那样。

    val json_schema = newdf11.select("value").collect().map(x => x.get(0)).mkString(",")
    

    您可以通过不同的方式获取 json 架构,如下所示...

    val dd: DataFrame =    spark.read.json("C:\\Users\\sheol\\Desktop\\streaming")
          dd.show()
    /** you can use  val df1 = spark.readStream.textFile(yourfile) also **/
    
          val json_schema = dd.schema.json;
          println(json_schema)
    
    

    结果:

    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
    {"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}
    

    你可以进一步细化你的要求,我会留给你的

    【讨论】:

      【解决方案2】:

      发生此异常的原因是您在流启动之前尝试从流中访问数据。 df3.printSchema() 存在问题,请确保在流启动后调用此函数。

      【讨论】:

      • val json_schema = newdf11.select("value").collect().map(x => x.get(0)).mkString(",") 这是代码显示的行错误来自..
      • 希望您的问题得到解决。我们不允许在流开始之前访问数据。
      • 我现在明白了。但是有更好的方法来看待这个吗?我可能需要传入 JSON 数据的结构类型或字符串,以便在此处每次更新架构。
      • 在我以前的项目中,我已经这样做了转换为结构类型。 val staticInputDS = spark.readStream.option("header", "true").schema(Util.schema).csv("hdfs://master:9000/user/hadoop/test*.csv")
      猜你喜欢
      • 2015-10-13
      • 2019-10-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-02
      • 2016-02-07
      相关资源
      最近更新 更多