【发布时间】:2021-05-18 18:18:01
【问题描述】:
我正在流式传输多个 Kafka 主题并从中创建表格。当有新数据进入流时,如何在这些表上运行 Spark SQL 查询?
public class SparkApp {
public static void main(String[] args) throws InterruptedException {
SparkSession spark = SparkSession
.builder()
.appName("Spark3App")
.config("spark.master", "local")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic_1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(functions.get_json_object(functions.col("value"),"$.data").alias("data"))
.withColumn("data", functions.from_json(functions.col("data"), Regions.data)).select("data.*");
df.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("update")
.format("memory")
.queryName("Regions")
.start();
Dataset<Row> df1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic_2")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(functions.get_json_object(functions.col("value"),"$.data").alias("data"))
.withColumn("data", functions.from_json(functions.col("data"), Projects.data)).select("data.*");
df1.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("update")
.format("memory")
.queryName("Projects")
.start();
spark.sql("Select Projects.project_name, Regions.region_name from Projects "
+ "Join Regions ON Regions.region_name = Projects.region_name")
.show();
}
}
如何保持应用程序持续运行并让它监听流并在流中有数据时运行 SQL 查询?最终目标是将 SQL 查询的输出写入另一个主题。
【问题讨论】:
标签: java scala apache-spark apache-kafka-streams