【问题标题】:overwrite hive partitions using spark使用 spark 覆盖 hive 分区
【发布时间】:2018-10-03 06:21:29
【问题描述】:

我正在使用 AWS,并且我有使用 Spark 和 Hive 的工作流程。我的数据按日期分区,所以每天我的 S3 存储中都有一个新分区。 我的问题是有一天加载数据失败,我必须重新执行该分区。接下来是写的代码:

df                            // My data in a Dataframe
  .write
  .format(getFormat(target))  // csv by default, but could be parquet, ORC...
  .mode(getSaveMode("overwrite"))  // Append by default, but in future it should be Overwrite
  .partitionBy(partitionName) // Column of the partition, the date
  .options(target.options)    // header, separator...
  .option("path", target.path) // the path where it will be storage
  .saveAsTable(target.tableName)  // the table name

我的流程中会发生什么?如果我使用 SaveMode.Overwrite,整个表将被删除,我将只保存分区。如果我使用 SaveMode.Append 我可能有重复的数据。

搜索了一下,发现Hive支持这种覆盖,只有分区,但是用hql语句,我没有。

我们需要 Hive 上的解决方案,所以我们不能使用这个 alternative option(直接到 csv)。

我发现这个Jira ticket 可以解决我遇到的问题,但是尝试使用最新版本的 Spark (2.3.0),情况是一样的。它会删除整个表并保存分区,而不是覆盖我的数据所具有的分区。

为了更清楚地说明这一点,这是一个例子:

由 A 分区

数据:

| A | B | C | 
|---|---|---| 
| b | 1 | 2 | 
| c | 1 | 2 |

表:

| A | B | C | 
|---|---|---| 
| a | 1 | 2 | 
| b | 5 | 2 | 

我想要的是:在表中,分区a留在表中,分区b用数据覆盖,并添加分区c。有没有使用 Spark 的解决方案可以做到这一点?

我的最后一个选择是首先删除要保存的分区,然后使用 SaveMode.Append,但如果没有其他解决方案,我会尝试这个。

【问题讨论】:

    标签: scala amazon-web-services apache-spark hadoop hive


    【解决方案1】:

    加上wandermonk@提到的,


    仅在 SQL 模式下支持动态分区插入(对于 INSERT OVERWRITE TABLE SQL 语句)。非基于文件的数据源(即 InsertableRelations)不支持动态分区插入。

    使用动态分区插入,OVERWRITE 关键字的行为由 spark.sql.sources.partitionOverwriteMode 配置属性控制(默认值:静态)。该属性控制 Spark 是否应该删除所有符合分区规范的分区,无论是否有数据要写入(静态)还是只删除那些将要写入数据的分区(动态)。

    启用动态覆盖模式后,Spark 只会删除它有数据要写入的分区。所有其他分区保持不变。

    来自

    从使用 Spark 写入动态分区 (https://medium.com/a-muggles-pensieve/writing-into-dynamic-partitions-using-spark-2e2b818a007a)


    Spark 现在像 Hive 一样写入分区数据 — 这意味着只有被 INSERT 查询触及的分区会被覆盖,而其他分区不会被触及。

    【讨论】:

      【解决方案2】:

      因此,如果您使用的是 Spark 版本

      想法是将数据集注册为表,然后使用 spark.sql() 运行 INSERT 查询。

      // Create SparkSession with Hive dynamic partitioning enabled
      val spark: SparkSession =
          SparkSession
              .builder()
              .appName("StatsAnalyzer")
              .enableHiveSupport()
              .config("hive.exec.dynamic.partition", "true")
              .config("hive.exec.dynamic.partition.mode", "nonstrict")
              .getOrCreate()
      // Register the dataframe as a Hive table
      impressionsDF.createOrReplaceTempView("impressions_dataframe")
      // Create the output Hive table
      spark.sql(
          s"""
            |CREATE EXTERNAL TABLE stats (
            |   ad            STRING,
            |   impressions   INT,
            |   clicks        INT
            |) PARTITIONED BY (country STRING, year INT, month INT, day INT)
            |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
          """.stripMargin
      )
      // Write the data into disk as Hive partitions
      spark.sql(
          s"""
            |INSERT OVERWRITE TABLE stats 
            |PARTITION(country = 'US', year = 2017, month = 3, day)
            |SELECT ad, SUM(impressions), SUM(clicks), day
            |FROM impressions_dataframe
            |GROUP BY ad
          """.stripMargin
      )
      

      【讨论】:

        【解决方案3】:

        如果你是Spark 2.3.0,尝试将spark.sql.sources.partitionOverwriteMode设置为dynamic,数据集需要分区,写入模式覆盖。

        spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
        data.write.mode("overwrite").insertInto("partitioned_table")
        

        【讨论】:

        • 这非常有用,谢谢。但我实际上使用的是 Spark 2.2.1,所以我现在不能使用它,但将来升级 Spark 到 2.3.0 时我会使用它。
        • 很高兴您发现它很有用。
        • 这不适用于 spark 2.4.0,我得到了上面代码的重复项。有什么想法吗?
        • 这是否适用于.saveAsTable(...)的托管表
        • @ChitralVerma 它对我不起作用。 saveAsTable 无论如何都会删除所有数据。
        【解决方案4】:

        我建议使用 sparksession 运行 sql。您可以通过从现有数据集中选择列来运行“插入覆盖分区查询”。此解决方案肯定只会覆盖分区。

        【讨论】:

        • 这会删除我的整个表格,如原始问题中所述。
        猜你喜欢
        • 2019-12-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-10-27
        • 2017-03-01
        相关资源
        最近更新 更多