【问题标题】:Overwrite specific partitions in spark dataframe write method覆盖火花数据帧写入方法中的特定分区
【发布时间】:2016-11-24 01:38:30
【问题描述】:

我想在 spark 中覆盖特定分区而不是全部。我正在尝试以下命令:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')

其中 df 是具有要覆盖的增量数据的数据帧。

hdfs-base-path 包含主数据。

当我尝试上述命令时,它会删除所有分区,并将 df 中存在的分区插入 hdfs 路径。

我的要求是仅覆盖指定 hdfs 路径中 df 中存在的那些分区。有人可以帮我吗?

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    这是一个常见问题。 Spark 到 2.0 的唯一解决方案是直接写入分区目录,例如,

    df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
    

    如果您使用的是 2.0 之前的 Spark,您需要使用以下方法阻止 Spark 发出元数据文件(因为它们会破坏自动分区发现):

    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
    

    如果您使用的是 1.6.2 之前的 Spark,您还需要删除 /root/path/to/data/partition_col=value 中的 _SUCCESS 文件,否则它的存在会破坏自动分区发现。 (我强烈建议使用 1.6.2 或更高版本。)

    您可以从我的 Spark 峰会 Bulletproof Jobs 上的演讲中获得有关如何管理大型分区表的更多详细信息。

    【讨论】:

    • 非常感谢 Sim 的回答。还有几个疑问,如果假设初始数据帧包含大约 100 个分区的数据,那么我是否必须将此数据帧拆分为另外 100 个具有相应分区值的数据帧并直接插入分区目录。可以并行保存这 100 个分区吗?另外我使用的是 Spark 1.6.1 如果我使用的是 orc 文件格式,我该如何停止为此发出元数据文件,是否与您提到的 parquet 相同?
    • 回复:元数据,不,ORC 是一种不同的格式,我认为它不会产生非数据文件。对于 1.6.1,您只需要分区树的子目录中的 ORC 文件。因此,您必须手动删除_SUCCESS。您可以并行写入多个分区,但不能来自同一个作业。根据您的平台功能启动多个作业,例如使用 REST API。
    • 有什么更新吗? saveToTable() 会覆盖特定的分区吗? spark 是否足够聪明,可以确定哪些分区被覆盖了?
    【解决方案2】:

    使用 Spark 1.6...

    HiveContext 可以大大简化这个过程。关键是您必须首先使用 CREATE EXTERNAL TABLE 语句在 Hive 中创建表并定义分区。例如:

    # Hive SQL
    CREATE EXTERNAL TABLE test
    (name STRING)
    PARTITIONED BY
    (age INT)
    STORED AS PARQUET
    LOCATION 'hdfs:///tmp/tables/test'
    

    从这里,假设您有一个 Dataframe,其中包含特定分区(或多个分区)的新记录。您可以使用 HiveContext SQL 语句使用此 Dataframe 执行 INSERT OVERWRITE,这将仅覆盖 Dataframe 中包含的分区的表:

    # PySpark
    hiveContext = HiveContext(sc)
    update_dataframe.registerTempTable('update_dataframe')
    
    hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
                       SELECT name, age
                       FROM update_dataframe""")
    

    注意:此示例中的 update_dataframe 的架构与目标 test 表的架构相匹配。

    使用这种方法容易犯的一个错误是跳过 Hive 中的 CREATE EXTERNAL TABLE 步骤,而只使用 Dataframe API 的 write 方法创建表。特别是对于基于 Parquet 的表,不会适当地定义该表以支持 Hive 的 INSERT OVERWRITE... PARTITION 函数。

    希望这会有所帮助。

    【讨论】:

    • 我尝试了上述方法,我收到了类似Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict的错误
    • 我没有任何静态分区列
    【解决方案3】:

    如果你使用 DataFrame,可能你想使用 Hive 表而不是数据。 在这种情况下,您只需要调用方法

    df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)
    

    它将覆盖 DataFrame 包含的分区。

    不必指定格式(orc),因为 Spark 将使用 Hive 表格式。

    在 Spark 1.6 版中运行良好

    【讨论】:

    • 如果之前的分区不在当前数据帧中,则删除它们。
    • 如果表是基于多列分区的,比如年,月,我只想根据年覆盖,如何更新数据?
    • 我也收到错误:AnalysisException: u"insertInto() 不能与 partitionBy() 一起使用。已经为表定义了分区列。没有必要使用 partitionBy( ).;"
    • without partitionBy 即使使用模式(“覆盖”),我也会插入重复的数据
    • 这是部分正确的。请参阅 Surya Murali 评论,了解我需要添加的其他设置才能使其正常工作。至少在我的情况下有效(spark 1.6,scala)
    【解决方案4】:

    您可以执行以下操作来使作业可重入(幂等): (在 spark 2.2 上试过这个)

    # drop the partition
    drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition)
    print drop_query
    spark.sql(drop_query)
    
    # delete directory
    dbutils.fs.rm(<partition_directoy>,recurse=True)
    
    # Load the partition
    df.write\
      .partitionBy("partition_col")\
      .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)
    

    【讨论】:

    • 为什么选择 Python 2?这看起来像 Databricks 特定的,很高兴为其他不使用该平台的人提一下。我喜欢幂等,但这是真的吗?如果删除目录成功但追加不成功怎么办?如何保证 df 包含已删除分区的数据?
    【解决方案5】:

    终于!这现在是 Spark 2.3.0 中的一个功能: SPARK-20236

    要使用它,需要将spark.sql.sources.partitionOverwriteMode设置为动态,数据集需要分区,写入模式overwrite。示例:

    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    data.write.mode("overwrite").insertInto("partitioned_table")
    

    我建议在写入之前根据您的分区列重新分区,这样您就不会最终每个文件夹有 400 个文件。

    在 Spark 2.3.0 之前,最好的解决方案是启动 SQL 语句来删除这些分区,然后以 append 模式写入。

    【讨论】:

    • 我很难找到使用它的设置,所以在这里留下参考:stackoverflow.com/questions/50006526/…
    • 您能否编辑答案以显示来自 JIRA 的示例代码?
    • 不起作用。尚未写入 HDFS 的新数据不会写入其中。
    • 如果我覆盖单个分区并且我知道该分区的名称先验,有没有办法指定spark就像我们可以做in Hive?我问这个是因为这会给我很多保证和作为健全性检查的工作,而且我相信也会有一些性能优势(因为不需要对每条记录进行分区的运行时解析)
    • @y2k-shubham 是的,使用 spark.sql('insert overwrite table TABLE_NAME partition(PARTITION_NAME=PARTITION_VALUE) YOUR SELECT STATEMENT) 这至少适用于 2.2,如果早期版本支持此功能,则不适用。
    【解决方案6】:

    我建议您先进行清理,然后使用Append 模式写入新分区:

    import scala.sys.process._
    def deletePath(path: String): Unit = {
        s"hdfs dfs -rm -r -skipTrash $path".!
    }
    
    df.select(partitionColumn).distinct.collect().foreach(p => {
        val partition = p.getAs[String](partitionColumn)
        deletePath(s"$path/$partitionColumn=$partition")
    })
    
    df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)
    

    这将只删除新分区。写入数据后,如果需要更新 Metastore,请运行此命令:

    sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")
    

    注意:deletePath 假定您的系统上有 hfds 命令可用。

    【讨论】:

      【解决方案7】:

      我尝试了以下方法来覆盖 HIVE 表中的特定分区。

      ### load Data and check records
          raw_df = spark.table("test.original")
          raw_df.count()
      
      lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925
      
      
      ### Check data in few partitions.
          sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
          print "Number of records: ", sample.count()
          sample.show()
      
      
      ### Back-up the partitions before deletion
          raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")
      
      
      ### UDF : To delete particular partition.
          def delete_part(table, part):
              qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
              spark.sql(qry)
      
      
      ### Delete partitions
          part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
          part_list = part_df.rdd.map(lambda x : x[0]).collect()
      
          table = "test.original"
          for p in part_list:
              delete_part(table, p)
      
      
      ### Do the required Changes to the columns in partitions
          df = spark.table("test.original_bkp")
          newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
          newdf.select("c_customer_sk", "c_preferred_cust_flag").show()
      
      
      ### Write the Partitions back to Original table
          newdf.write.insertInto("test.original")
      
      
      ### Verify data in Original table
          orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()
      
      
      
      Hope it helps.
      
      Regards,
      
      Neeraj
      

      【讨论】:

        【解决方案8】:

        我建议您创建一个像目标表一样的临时表并将数据插入其中,而不是直接写入目标表。

        CREATE TABLE tmpTbl LIKE trgtTbl LOCATION '<tmpLocation';
        

        创建表后,您可以将数据写入tmpLocation

        df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)
        

        然后你可以通过执行来恢复表分区路径:

        MSCK REPAIR TABLE tmpTbl;
        

        通过查询 Hive 元数据来获取分区路径,例如:

        SHOW PARTITONS tmpTbl;
        

        trgtTbl 中删除这些分区并将目录从tmpTbl 移动到trgtTbl

        【讨论】:

          【解决方案9】:

          正如 jatin 所写,您可以从配置单元和路径中删除分区,然后附加数据 由于我浪费了太多时间,我为其他 spark 用户添加了以下示例。 我使用 Scala 和 spark 2.2.1

            import org.apache.hadoop.conf.Configuration
            import org.apache.hadoop.fs.Path
            import org.apache.spark.SparkConf
            import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
          
            case class DataExample(partition1: Int, partition2: String, someTest: String, id: Int)
          
           object StackOverflowExample extends App {
          //Prepare spark & Data
          val sparkConf = new SparkConf()
          sparkConf.setMaster(s"local[2]")
          val spark = SparkSession.builder().config(sparkConf).getOrCreate()
          val tableName = "my_table"
          
          val partitions1 = List(1, 2)
          val partitions2 = List("e1", "e2")
          val partitionColumns = List("partition1", "partition2")
          val myTablePath = "/tmp/some_example"
          
          val someText = List("text1", "text2")
          val ids = (0 until 5).toList
          
          val listData = partitions1.flatMap(p1 => {
            partitions2.flatMap(p2 => {
              someText.flatMap(
                text => {
                  ids.map(
                    id => DataExample(p1, p2, text, id)
                  )
                }
              )
            }
            )
          })
          
          val asDataFrame = spark.createDataFrame(listData)
          
          //Delete path function
          def deletePath(path: String, recursive: Boolean): Unit = {
            val p = new Path(path)
            val fs = p.getFileSystem(new Configuration())
            fs.delete(p, recursive)
          }
          
          def tableOverwrite(df: DataFrame, partitions: List[String], path: String): Unit = {
            if (spark.catalog.tableExists(tableName)) {
              //clean partitions
              val asColumns = partitions.map(c => new Column(c))
              val relevantPartitions = df.select(asColumns: _*).distinct().collect()
              val partitionToRemove = relevantPartitions.map(row => {
                val fields = row.schema.fields
                s"ALTER TABLE ${tableName} DROP IF EXISTS PARTITION " +
                  s"${fields.map(field => s"${field.name}='${row.getAs(field.name)}'").mkString("(", ",", ")")} PURGE"
              })
          
              val cleanFolders = relevantPartitions.map(partition => {
                val fields = partition.schema.fields
                path + fields.map(f => s"${f.name}=${partition.getAs(f.name)}").mkString("/")
              })
          
              println(s"Going to clean ${partitionToRemove.size} partitions")
              partitionToRemove.foreach(partition => spark.sqlContext.sql(partition))
              cleanFolders.foreach(partition => deletePath(partition, true))
            }
            asDataFrame.write
              .options(Map("path" -> myTablePath))
              .mode(SaveMode.Append)
              .partitionBy(partitionColumns: _*)
              .saveAsTable(tableName)
          }
          
          //Now test
          tableOverwrite(asDataFrame, partitionColumns, tableName)
          spark.sqlContext.sql(s"select * from $tableName").show(1000)
          tableOverwrite(asDataFrame, partitionColumns, tableName)
          
          import spark.implicits._
          
          val asLocalSet = spark.sqlContext.sql(s"select * from $tableName").as[DataExample].collect().toSet
          if (asLocalSet == listData.toSet) {
            println("Overwrite is working !!!")
          }
          

          }

          【讨论】:

            【解决方案10】:

            使用 Scala 在 Spark 2.3.1 上对此进行了测试。 上面的大多数答案都是写入 Hive 表。但是,我想直接写入 disk,该文件夹顶部有一个 external hive table

            首先需要的配置

            val sparkSession: SparkSession = SparkSession
                  .builder
                  .enableHiveSupport()
                  .config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
                  .appName("spark_write_to_dynamic_partition_folders")
            

            这里的用法:

            DataFrame
            .write
            .format("<required file format>")
            .partitionBy("<partitioned column name>")
            .mode(SaveMode.Overwrite) // This is required.
            .save(s"<path_to_root_folder>")
            

            【讨论】:

              【解决方案11】:

              在 insertInto 语句中添加 'overwrite=True' 参数可以解决这个问题:

              hiveContext.setConf("hive.exec.dynamic.partition", "true")
              hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
              
              df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)
              

              默认overwrite=False。将其更改为True 允许我们覆盖包含在df 和partioned_table 中的特定分区。这有助于我们避免用df 覆盖partioned_table 的全部内容。

              【讨论】:

              • 似乎改变了这种做法。
              • 这对我有用,虽然它不接受 Spark 2.3 中的“overwrite=True”参数
              【解决方案12】:

              对于 >= Spark 2.3.0:

              spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
              data.write.insertInto("partitioned_table", overwrite=True)
              

              【讨论】:

              • 这仍然会覆盖整个表。
              【解决方案13】:
              spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
              data.toDF().write.mode("overwrite").format("parquet").partitionBy("date", "name").save("s3://path/to/somewhere")
              

              这对我适用于 AWS Glue ETL 作业(Glue 1.0 - Spark 2.4 - Python 2)

              【讨论】:

              • 这种方法对作业书签的表现如何?假设您有一个现有分区(例如一天),它只有当天的前 12 小时数据,并且新文件已经到达您的源中,这些文件应该添加到该分区的后 12 小时,我担心胶水作业书签非常幼稚,它最终只会在第二个 12 小时内从新文件中写入数据。还是你不使用工作书签?
              • 好问题!我也有同样的担忧。我的用例是我专门要求 Glue 重新处理某些分区并重新写入结果(使用上述两行)。启用作业书签后,它拒绝重新处理“旧”数据。
              • 所以你不使用书签?这几乎是我能看到的唯一原因,因为我只需要使用glueContext 而不是坚持使用 Spark。我不想管理已处理的状态,但我发现书签很不稳定,依赖于文件修改的时间戳,除了残酷的重置之外,没有办法同步它。为什么是 Python 2 而不是 3?
              • 是的,工作书签已经困扰我一段时间了。这对一些低调的日常工作很有好处。但是一旦你有了一点“越野”的动作,那东西就不是没用了。关于 Python 版本,当从 Glue 0.9 升级时,查看两个选项(Python 2 vs 3),我只是不想破坏任何东西,因为代码是在 Python 2 时代编写的 ^_^
              • “比没用”,指出。除了print is a functionunicode done properlyliteral long not necessary 之外,2->3 并没有什么意义。 Pyspark DSL 语法似乎相同。 2020 年正式不支持 Python 2,是时候放弃它了。
              【解决方案14】:

              我的解决方案意味着从 spark 数据帧开始覆盖每个特定分区。它跳过了删除分区部分。我正在使用 pyspark>=3,我正在 AWS s3 上写作:

              def write_df_on_s3(df, s3_path, field, mode):
                  # get the list of unique field values
                  list_partitions = [x.asDict()[field] for x in df.select(field).distinct().collect()]
                  df_repartitioned = df.repartition(1,field)
                  for p in list_partitions:
                      # create dataframes by partition and send it to s3
                      df_to_send = df_repartitioned.where("{}='{}'".format(field,p))
                      df_to_send.write.mode(mode).parquet(s3_path+"/"+field+"={}/".format(p))
              

              这个简单函数的参数是 df、s3_path、分区字段和模式(覆盖或追加)。第一部分获取唯一的字段值:这意味着如果我按每日对 df 进行分区,我将获得 df 中所有每日的列表。然后我重新分区df。最后,我每天选择重新分区的 df,并将其写入其特定的分区路径。

              您可以根据需要更改重新分区整数。

              【讨论】:

                猜你喜欢
                • 2022-10-05
                • 1970-01-01
                • 1970-01-01
                • 2020-08-11
                • 2018-11-27
                • 2016-03-30
                • 1970-01-01
                • 1970-01-01
                • 2021-10-15
                相关资源
                最近更新 更多