【问题标题】:Apache Spark/Azure Data Lake Storage - Process the file exactly once, tag the file as processedApache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理
【发布时间】:2021-04-28 15:01:18
【问题描述】:

我有一个 Azure Data Lake Storage 容器,它充当 Apache Spark 处理 JSON 文件的登陆区域。

那里有数以万计的小(最多几 MB)文件。 Spark 代码会定期读取这些文件并执行一些转换。

我希望文件只被读取一次,并且 Spark 脚本是幂等的。 如何确保不会一次又一次地读取文件?我如何以有效的方式做到这一点?

我是这样读取数据的:

spark.read.json("/mnt/input_location/*.json")

我想到了以下方法:

  1. 使用已处理的文件名创建一个 Delta 表,并在输入 DataFrame 上运行 EXCEPT 转换
  2. 将处理后的文件移动到不同的位置(或重命名)。我宁愿不那样做。如果我需要重新处理数据,我需要再次运行 rename,此操作需要很长时间。

我希望有更好的方法。请提出一些建议。

【问题讨论】:

    标签: apache-spark azure-data-lake delta-lake


    【解决方案1】:

    您可以使用启用了检查点和Trigger.Once 的结构化流作业。

    该作业的检查点文件将跟踪该作业已使用的 JSON 文件。此外,Trigger.Once 触发器将使该流式作业如同批处理作业一样。

    Databricks 有一篇很好的文章解释了“为什么 Streaming and RunOnce 比 Batch 更好”。

    您的结构化流式传输作业可能如下所示:

    val checkpointLocation = "/path/to/checkpoints"
    val pathToJsonFiles = "/mnt/input_location/"
    val streamDF = spark.readStream.format("json").schema(jsonSchema).load(pathToJsonFiles)
    
    val query = streamDF
      .[...] // apply your processing
      .writeStream
      .format("console") // change sink format accordingly
      .option("checkpointLocation", checkpointLocation)
      .trigger(Trigger.Once)
      .start()
    
    query.awaitTermination()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-10-14
      • 1970-01-01
      • 1970-01-01
      • 2019-11-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多