【问题标题】:How can I use spark-sql to read and process a JSON event from kafka?如何使用 spark-sql 从 kafka 读取和处理 JSON 事件?
【发布时间】:2018-11-06 16:34:25
【问题描述】:

详细说明:希望使用结构化 spark 流实现 scala 代码、DataFrame 从 Kafka 读取 JSON 事件,并使用 spark-sql 操作数据/列并将其写入 hive?

使用 scala 2.11/spark 2.2

我知道创建连接很简单:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

如何处理 JSON 事件?假设所有事件都具有相同的架构,我是否必须提供架构,如果是,它是如何完成的,如果有办法推断架构,那是如何完成的?

如果我理解正确,然后创建一个 tempView,我如何在该视图上运行类似 sql 的查询?

您的系统需要编辑:自动系统将此帖子标记为重复,但事实并非如此。在链接的问题中,OP 要求解决他现有代码的问题,而一个(有效的)答案解决了 JSON 反序列化的问题。 如上所述,我的问题有所不同。如果我的问题不清楚,请具体询问,我会尝试进一步澄清。谢谢。

【问题讨论】:

标签: json scala apache-spark apache-kafka spark-streaming


【解决方案1】:

假设所有事件都具有相同的架构,我是否必须提供 模式,如果是这样,它是如何完成的,还有,如果有一种方法来推断 schema 是怎么做的?

更好的一面是,如果您知道架构,请提供架构。您可以按如下方式创建架构:

val schema = new StructType().add( "Id",IntegerType).add( "name",StringType)

然后从Kafka中读取数据并反序列化如下:

val data_df = df.select(from_json(col("value"), schema).as("data")).select("data.*")

您可以从 data_df 创建一个临时视图。

data_df.createOrReplaceTempView("data_df")

现在您可以使用 spark sql 查询视图

spark.sql("select * from data_df").show()

【讨论】:

    猜你喜欢
    • 2018-11-08
    • 1970-01-01
    • 2019-11-04
    • 2018-02-25
    • 2021-06-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多