【问题标题】:How do I delete a parquet file in spark scala?如何在 spark scala 中删除镶木地板文件?
【发布时间】:2020-09-03 00:19:53
【问题描述】:

我正在写入 databricks 中的 parquet 文件,但在此之前,我想删除它的旧版本。

这是我的写法:

report.coalesce(1).write.mode("append").partitionBy("Name").parquet(s"s3://${reportBucket}/reports/dashboard")

我不知道如何检查此文件是否存在,如果存在则删除。

类的一些伪代码,以及调用它的行。我正在尝试检查输出文件是否存在,如果存在,则将其删除。删除后,该类将运行两次,并将结果附加到 parquet 文件中。但是必须在run2之后才能删除,而不是run1。

class WriteReport(val run: String = "run1") {
val report = spark.read.parquet(s"blablah")
report.createOrReplaceTempView("report")

val dashboard = spark.sql (""" 
                SELECT name as Name from Table
                """)

report.coalesce(1).write.mode("append").partitionBy("Name").parquet(s"s3://${reportBucket}/reports/dashboard")
}

val n_b = new Report ("run1")
val n_g = new Report ("run2")

【问题讨论】:

  • 为什么不使用覆盖模式?

标签: scala apache-spark databricks


【解决方案1】:

Spark 不支持 S3 delete ,您只能在 您需要构建自己的逻辑的框架和其他外部框架任务 在 Spark 之外。在触发 EMR 之前,您需要将 lamda 配置为 清理你的目录并确认删除触发 Spark Job。

在重新创建文件之前删除本地目录:

    import scala.reflect.io.Directory
    import java.io.File

    val dir = new Directory(new File("/yourDirectory"))
    dir.deleteRecursively()

在重新创建文件之前删除 AWS S3 中的目录:

AWS S3 在 PUTS 对象上具有写后读一致性,覆盖 PUTS 和 DELETES 对象的最终一致性,因此一旦您删除不会同时删除并且需要一些时间

所以使用 S3 commonds 并删除并同时运行新作业会遇到一些问题,您需要构建逻辑以每次创建单独的目录并在工作完成后删除。

Java:

您需要使用 AWS sdk 删除,因为 Spark 不支持从 S3 删除任何命令

    if (s3Client.doesBucketExist(bucketName)) {
                ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
                        .withBucketName(bucketName)
                        .withPrefix("foo/bar/baz");

                ObjectListing objectListing = s3Client.listObjects(listObjectsRequest);

                while (true) {
                    for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
                        s3Client.deleteObject(bucketName, objectSummary.getKey());
                    }
                    if (objectListing.isTruncated()) {
                        objectListing = s3Client.listNextBatchOfObjects(objectListing);
                    } else {
                        break;
                    }
                }
            }

BOTO S3 SDK:

     import boto3

     client = boto3.client('s3')
    client.delete_object(Bucket='mybucketname', Key='myfile.whatever')

【讨论】:

  • Spark 可以与 hadoop、Google 存储和其他文件系统交互,并且鉴于其是一个大数据处理工具,数据不太可能驻留在本地文件系统中。
  • GCP 、AWS 和 Azure 有自己的 SDK ,请更新问题你正在使用什么云
  • 这不是我的问题,只是注意到这不太可能有帮助。我想如果有一种方法可以使用 spark api 并让 spark 处理文件系统,这将是一个很好的答案,因为它将支持 spark 支持的任何内容。
  • Spark 不提供对 S3 delete 的支持,您只能在框架和其他需要在 Spark 之外构建自己的逻辑的外部框架任务下工作。在触发 EMR 之前,您需要配置 lamda 以清理您的目录并在确认删除时触发 Spark Job
  • 啊,这真是有用的信息!您能否将 spark 不支持在 s3 上删除到您的答案中,如果您知道如何在 hdfs 上执行此操作,也许可以为 hadoop 提供答案,因为 hdfs 很常见。
猜你喜欢
  • 1970-01-01
  • 2020-02-03
  • 1970-01-01
  • 2022-01-19
  • 2019-10-29
  • 1970-01-01
  • 2019-11-20
  • 2019-06-02
  • 1970-01-01
相关资源
最近更新 更多