【问题标题】:How to convert JSON Dataset to DataFrame in Spark Structured Streaming [duplicate]如何在 Spark Structured Streaming 中将 JSON 数据集转换为 DataFrame [重复]
【发布时间】:2018-02-05 18:22:56
【问题描述】:

我正在使用 Spark 结构化流处理来自 Kafka 的数据。我将每条消息转换为 JSON。然而,spark 需要一个 explicit 模式来从 JSON 中获取列。使用DStreams 的 Spark Streaming 允许进行以下操作

spark.read.json(spark.createDataset(jsons))

其中jsonsRDD[String]。 在 Spark Structured Streaming 的情况下类似的方法

df.sparkSession.read.json(jsons)

jsonsDataSet[String]

导致以下异常

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我假设read 触发执行而不是start,但是有没有办法绕过这个?

【问题讨论】:

    标签: apache-spark spark-streaming spark-structured-streaming


    【解决方案1】:

    要从 Kafka 上的 JSON 流式传输到 DataFrame,您需要执行以下操作:

      case class Colour(red: Int, green: Int, blue: Int)
    
      val colourSchema: StructType = new StructType()
        .add("entity", "int")
        .add("security", "int")
        .add("client", "int")
    
      val streamingColours: DataFrame = spark
        .readStream
        .format("kafka")
        .load()
        .selectExpr("CAST(value AS STRING)")
        .select(from_json($"value", colourSchema))
    
      streamingColours
        .writeStream
        .outputMode("complete")
        .format("console")
        .start()
    

    这应该会创建一个流式 DataFrame,并在控制台上显示从 Kafka 读取的结果。

    我不相信可以对流数据集使用“推断架构”。这是有道理的,因为推断模式会查看大量数据来确定类型是什么等。对于流式数据集,可以通过处理第一条消息推断出的模式可能与第二条消息的模式不同,等等.Spark 需要一个模式来处理 DataFrame 的所有元素。

    我们过去做的是用 Spark 的批处理和使用 infer schema 来处理一批 JSON 消息。然后导出该架构以用于流式数据集。

    【讨论】:

    • 我熟悉显式模式方法。但是我想处理我不知道 JSON 模式的情况。
    • 啊,我明白了。我会更新我的答案。短篇小说,这是不可能的。
    猜你喜欢
    • 2016-05-16
    • 1970-01-01
    • 2019-05-14
    • 1970-01-01
    • 2021-03-26
    • 1970-01-01
    • 1970-01-01
    • 2018-06-04
    • 1970-01-01
    相关资源
    最近更新 更多