【问题标题】:Structured streaming schema from Kafka JSON - query error来自 Kafka JSON 的结构化流模式 - 查询错误
【发布时间】:2022-01-23 21:55:58
【问题描述】:

我正在使用 Spark 3.2 从 Kafka 2.12-3.0.0 获取 JSON 流。 解析 JSON 后,我在查询中收到错误。

Kafka 主题流式 JSON:

b'{"pmu_id": 2, "time": 1642771653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 49.99, "rocof": 1}'
b'{"pmu_id": 2, "time": 1642734653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 50.00, "rocof": -1}'

DataFrame 架构:

stream01Schema= StructType()\
    .add("pmu_id", ByteType())\
    .add("time", TimestampType()).add("stream_id", ByteType())
    .add("analog", StringType()).add("digital", ByteType()).add("frequency", FloatType()).add("rocof", ByteType())

构造一个从主题读取的流式 DataFrame:

stream01DF = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()
        .select(col("key").cast("string") from_json(col("value").cast("string").alias("pmudata"), stream01Schema))

打印结果架构:

root
 |-- key: string (nullable = true)
 |-- from_json(CAST(value AS STRING) AS pmudata): struct (nullable = true)    
 |    |-- pmu_id: byte (nullable = true)
 |    |-- time: timestamp (nullable = true)
 |    |-- stream_id: byte (nullable = true)
 |    |-- analog: string (nullable = true)
 |    |-- digital: byte (nullable = true)
 |    |-- frequency: float (nullable = true)
 |    |-- rocof: byte (nullable = true)

测试查询:

testQuery = stream01DF.groupBy("pmudata.rocof").count()    
testQuery.writeStream \
  .outputMode("complete") \
  .format("console") \
  .option("truncate", False) \
  .start() \
  .awaitTermination()

收到错误:

pyspark.sql.utils.AnalysisException: cannot resolve 'pmudata.rocof' given input columns: [from_json(CAST(value AS STRING) AS pmudata), key];

【问题讨论】:

  • AS pmudata 没有为该列正确别名。请显示您在哪里设置pmu214DF
  • 我已经更正了这个问题并更新了所有相关信息的答案。
  • 正如另一个问题中提到的,您是否尝试过from_json(col("value").cast("string"), stream01Schema).alias("pmudata")?换句话说,您想使用 then 别名的模式解析 json。这与我链接到的博客中的用法相同
  • 终于成功了!谢谢!请把它放在答案中,以便我接受。我将删除我之前的问题。

标签: apache-spark pyspark apache-kafka apache-spark-sql spark-structured-streaming


【解决方案1】:

您似乎正在寻找这个,因为您试图将 from_json() 列(检查您的括号)别名为一个名称,您可以稍后选择/分组。

from_json(col("value").cast("string"), stream01Schema).alias("pmudata")

完整的用法在this Databricks post中的端到端示例中

【讨论】:

    猜你喜欢
    • 2019-07-29
    • 2020-10-17
    • 2019-09-13
    • 2023-04-11
    • 1970-01-01
    • 2018-03-18
    • 2021-09-30
    • 2020-02-27
    • 2019-04-02
    相关资源
    最近更新 更多