【问题标题】:Kafka Spark Streaming: How to run Spark SQL query on multiple tables created by spark steaming?Kafka Spark Streaming:如何在 Spark Steaming 创建的多个表上运行 Spark SQL 查询?
【发布时间】:2021-05-18 18:18:01
【问题描述】:

我正在流式传输多个 Kafka 主题并从中创建表格。当有新数据进入流时,如何在这些表上运行 Spark SQL 查询?

public class SparkApp {

    public static void main(String[] args) throws InterruptedException {
        SparkSession spark = SparkSession
                .builder()
                .appName("Spark3App")
                .config("spark.master", "local")
                .getOrCreate();

        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topic_1")
                .load()
                .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
                .select(functions.get_json_object(functions.col("value"),"$.data").alias("data"))
                .withColumn("data", functions.from_json(functions.col("data"), Regions.data)).select("data.*");

        df.writeStream()
                .trigger(Trigger.ProcessingTime("10 seconds"))
                .outputMode("update")
                .format("memory")
                .queryName("Regions")
                .start();

        Dataset<Row> df1 = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topic_2")
                .load()
                .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
                .select(functions.get_json_object(functions.col("value"),"$.data").alias("data"))
                .withColumn("data", functions.from_json(functions.col("data"), Projects.data)).select("data.*");

        df1.writeStream()
                .trigger(Trigger.ProcessingTime("10 seconds"))
                .outputMode("update")
                .format("memory")
                .queryName("Projects")
                .start();

        spark.sql("Select Projects.project_name, Regions.region_name from Projects "
                    + "Join Regions ON  Regions.region_name = Projects.region_name")
                    .show();
        }
    }

如何保持应用程序持续运行并让它监听流并在流中有数据时运行 SQL 查询?最终目标是将 SQL 查询的输出写入另一个主题。

【问题讨论】:

    标签: java scala apache-spark apache-kafka-streams


    【解决方案1】:

    需要先加入 2 个流。

    val joinedDF = df.join(
      df1,
      expr("""
      project_name = region_name
      """),
      joinType = "inner"  
    )
    

    完成后,对每个微批次执行 sql 查询

    val query = joinedDF.writeStream.option("checkpointLocation", checkPointLocation).foreachBatch { (microbacth, batchId) => {
    
      microbacth.createOrReplaceTempView("final_table")
    
      val result = spark.sql("Select project_name, region_name from final_table")
    
      result.limit(5).show()
      // Additional logic
    
    }}.trigger(Trigger.ProcessingTime("10 seconds")).outputMode("update").start()
    
    query.awaitTermination()
    

    【讨论】:

      猜你喜欢
      • 2015-09-13
      • 1970-01-01
      • 1970-01-01
      • 2019-08-08
      • 1970-01-01
      • 1970-01-01
      • 2019-03-10
      • 1970-01-01
      • 2016-11-29
      相关资源
      最近更新 更多