【问题标题】:Read file path from Kafka topic and then read file and write to DeltaLake in Structured Streaming从 Kafka 主题读取文件路径,然后在结构化流中读取文件并写入 DeltaLake
【发布时间】:2021-04-22 22:12:41
【问题描述】:

我有一个用例,其中存储在 s3 中的 json 记录的文件路径以 kafka 的形式出现 卡夫卡中的消息。我必须使用 spark 结构化流处理数据。

我认为的设计如下:

  1. 在 kafka Spark 结构化流中,读取包含数据路径的消息。
  2. 收集驱动程序中的消息记录。 (消息很小)
  3. 从数据位置创建数据框。
kafkaDf.select($"value".cast(StringType))
       .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
  //rough code
  //collect to driver
  val records = batchDf.collect()
  //create dataframe and process
  records foreach((rec: Row) =>{
    println("records:######################", rec.toString())
    val path = rec.getAs[String]("data_path")
    val dfToProcess = spark.read.json(path)
    ....
  })
}

我想知道您的意见,如果这种方法可以吗?特别是如果在调用 collect 后创建 Dataframe 有问题。 如果有更好的方法,请告诉我。

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming delta-lake


    【解决方案1】:

    你的想法很好用。

    实际上,必须将您的 Dataframe 收集到驱动程序。否则,您无法通过在每个执行器上调用 SparkSession 来创建分布式数据集。如果没有collect,您最终会遇到 NullPointerException。

    我稍微重写了您的代码片段,并实现了如何将您的 Dataframe 写入增量表的部分(基于您的其他 question)。此外,我使用的是Dataset[String] 而不是Dataframe[Row],这让生活更轻松。

    使用带有 delta-core 0.7.0 的 Spark 3.0.1 可以正常工作。例如,我的测试文件看起来像

    {"a":"foo1","b":"bar1"}
    {"a":"foo2","b":"bar2"}
    

    我将该文件的位置发送到名为“test”的 Kafka 主题,并使用以下代码应用以下代码来解析文件并将其列(基于给定架构)写入增量表:

      val spark = SparkSession.builder()
        .appName("KafkaConsumer")
        .master("local[*]")
        .getOrCreate()
    
      val jsonSchema = new StructType()
        .add("a", StringType)
        .add("b", StringType)
    
      val deltaPath = "file:///tmp/spark/delta/test"
    
      import spark.implicits._
      val kafkaDf = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "test")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()
        .selectExpr("CAST(value AS STRING) as data_path")
        .as[String]
    
      kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
        // collect to driver
        val records = batchDf.collect()
    
        // create dataframe based on file location and process and write to Delta-Lake
        records.foreach((path: String) => {
          val dfToProcess = spark.read.schema(jsonSchema).json(path)
          dfToProcess.show(false) // replace this line with your custom processing logic
          dfToProcess.write.format("delta").save(deltaPath)
        })
      }).start()
    
      spark.streams.awaitAnyTermination()
    

    show 调用的输出符合预期:

    +----+----+
    |a   |b   |
    +----+----+
    |foo1|bar1|
    |foo2|bar2|
    +----+----+
    

    并且数据已作为增量表写入通过deltaPath指定的位置

    /tmp/spark/delta/test$ ll
    total 20
    drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
    drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
    drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
    -rw-r--r-- 1 x x  595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
    -rw-r--r-- 1 x x   16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc
    
    

    【讨论】:

    • 抱歉堆栈不允许编辑评论。如果您可以就资源方面发表您的看法。我的意思是,我们本质上是在流式微批处理中创建许多独立的 Spark 作业。而且它可能不会并行运行所有这些。这些工作中的资源如何共享?另一个它可能会在 Kafka 主题中产生滞后,因为下一个触发器将等待当前完成。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-25
    • 2021-07-27
    • 2015-06-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多