【问题标题】:Read Data from kafka topic into spark dataframe将 kafka 主题中的数据读入 spark 数据框
【发布时间】:2021-01-16 01:42:36
【问题描述】:
private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(sparkSqlMysql.class);

private static final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Spark2JdbcDs")
        .getOrCreate();

public static void main(String[] args) {
    // JDBC connection properties


    // Load MySQL query result as Dataset

    Dataset<Row> df = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "SqlMessages").load();

我想做一些事情,我可以从我的 kafka 主题中读取我的 spark SQL 中的数据,但不能这样做。

谁能指导我将我的数据从 kafka 主题转换为 spark SQL 吗?

我能做到的事情

 Dataset<Row> schoolData = sparkSession.sql("select * from Schools");

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    今天正在做类似的事情。从头开始消耗整个主题,转换为 DataFrame 并保存为 Parquet 表。您可以从 Scala 改编我的代码,思路应该很清楚。

    val topic = "topic_bla_bla"
    val brokers = "some_kafka_broker:9092"
    val kafkaDF = spark.read.format("kafka").option("kafkaConsumer.pollTimeoutMs", "20000").option("startingOffsets", "earliest").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).load()
    val jsonDF = kafkaDF.selectExpr("CAST(value AS STRING)")
    val finalDF = spark.read.option("mode", "PERMISSIVE").json(jsonDF.as[String])
    finalDF.registerTempTable("wow_table")
    //OR
    finalDF.write.format("parquet").saveAsTable("default.wow_table")
    spark.sql("select * from wow_table")
    

    【讨论】:

    • 你能用 Java 做吗?
    • @AdityaVerma 你应该可以。想法是通过 spark.read.format("kafka") 从 kafka 读取数据,选择并将值转换为字符串,将 DataFrame 转换为 Dataset 以便您可以执行 spark.read.option("mode", "PERMISSIVE").json (...)。
    猜你喜欢
    • 2023-03-19
    • 2020-09-18
    • 2021-06-11
    • 1970-01-01
    • 2021-05-23
    • 1970-01-01
    • 2021-04-17
    • 2020-08-28
    • 1970-01-01
    相关资源
    最近更新 更多