【问题标题】:Using checkpointed dataframe to overwrite table fails with FileNotFoundException使用检查点数据框覆盖表失败并出现 FileNotFoundException
【发布时间】:2019-06-27 11:32:14
【问题描述】:

我在 pySpark 中有一些数据框 df,它是由调用产生的:

df = spark.sql("select A, B from org_table")
df = df.stuffIdo

我想覆盖脚本末尾的org_table。 由于禁止覆盖输入表,我检查了我的数据:

sparkContext.setCheckpointDir("hdfs:/directoryXYZ/PrePro_temp")
checkpointed = df.checkpoint(eager=True)

现在应该打破血统,我还可以使用checkpointed.show()(有效)查看我的检查点数据。不工作的是写表:

checkpointed.write.format('parquet')\
    .option("checkpointLocation", "hdfs:/directoryXYZ/PrePro_temp")\
    .mode('overwrite').saveAsTable('org_table')

这会导致错误:

原因:java.io.FileNotFoundException:文件不存在:hdfs://org_table_path/org_table/part-00081-4e9d12ea-be6a-4a01-8bcf-1e73658a54dd-c000.snappy.parquet

我已经尝试了几件事,比如在写作之前刷新 org_table 等,但我在这里感到困惑。 我该如何解决这个错误?

【问题讨论】:

  • 嗨@Markus,有什么消息你是怎么解决这个问题的?

标签: python apache-spark pyspark pyspark-sql spark-checkpoint


【解决方案1】:

我会小心转换输入是新输出的此类操作。这样做的原因是,如果出现任何错误,您可能会丢失数据。让我们假设您的转换逻辑有问题,并且您生成了无效数据。但你只在一天后看到了这一点。此外,要修复错误,您不能使用刚刚转换的数据。您需要转换之前的数据。您如何使数据再次保持一致?

另一种方法是:

  • 暴露一个视图
  • 在每个批次中,您都在编写一个新表,最后您只需用这个新表替换视图
  • 几天后,您还可以计划一个清理工作,删除过去 X 天的表

如果您想保留您的解决方案,为什么不简单地这样做而不是处理检查点?

df.write.parquet("hdfs:/directoryXYZ/PrePro_temp")\
    .mode('overwrite')

df.load("hdfs:/directoryXYZ/PrePro_temp").write.format('parquet').mode('overwrite').saveAsTable('org_table')

当然,您会读取两次数据,但它看起来不像带有检查点的那次那么难看。此外,您每次都可以将“中间”数据存储在不同的目录中,因此您可以解决我在开始时暴露的问题。即使您有错误,您仍然可以通过简单地选择一个好的目录并在 org_table 中执行 .write.format(...) 来带来有效版本的数据。

【讨论】:

    猜你喜欢
    • 2014-05-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-13
    相关资源
    最近更新 更多