【问题标题】:Hive on Spark list all partitions for specific hive table and adding a partitionSpark 上的 Hive 列出特定 hive 表的所有分区并添加分区
【发布时间】:2017-03-09 03:42:48
【问题描述】:

我正在使用 spark 2.0,我想知道,是否可以列出特定 hive 表的所有文件?如果是这样,我可以使用 spark sc.textFile("file.orc") 直接增量更新这些文件。 如何向 hive 表添加新分区?我可以从 spark 使用 hive 元存储上的任何 api 吗?

有什么方法可以获取映射数据框row => partition_path的内部配置单元函数

我的主要理由是对表进行增量更新。现在我想出的唯一方法是FULL OUTER JOIN SQL +SaveMode.Overwrite,效率不高,因为他会覆盖所有表,而我的主要兴趣是针对某些特定分区的增量更新/添加新分区

编辑 从我在 HDFS 上看到的情况来看,当 SaveMode.Overwrite spark 将发出表定义时,即CREATE TABLE my_table .... PARTITION BY (month,..)。 spark将所有文件放在$HIVE/my_table下而不是$HIVE/my_table/month/...下,这意味着他没有对数据进行分区。当我写 df.write.partitionBy(...).mode(Overwrite).saveAsTable("my_table") 时,我在 hdfs 上看到它是正确的。 我使用了SaveMode.Overwrite,因为我正在更新记录而不是附加数据。

我使用 spark.table("my_table") 加载数据,这意味着 spark 延迟加载表,这是一个问题,因为我不想加载所有表只是 if 的一部分。

问题:

1.spark是否会打乱数据,因为我使用了partitionBy(),或者他比较当前分区,如果相同,他不会打乱数据。

2.spark 是否足够聪明,可以在从数据中更改部分数据(即仅针对特定月份/年份)时使用分区修剪,并应用该更改而不是加载所有数据? (FULL OUTER JOIN 基本上是扫描所有表的操作)

【问题讨论】:

    标签: apache-spark hive


    【解决方案1】:

    添加分区:

    可以使用DataFrameWriter 中提供的partitionBy 为非流式数据添加分区,或使用DataStreamWriter 用于流式数据。

    public DataFrameWriter<T> partitionBy(scala.collection.Seq<String> colNames)
    

    所以如果你想通过yearmonth 对数据进行分区,spark 会将数据保存到如下文件夹:

    year=2016/month=01/
    year=2016/month=02/
    

    您提到了orc - 您可以使用另存为orc 格式:

    df.write.partitionBy('year', 'month').format("orc").save(path)
    

    但您可以轻松地插入到 hive 表中,例如:

    df.write.partitionBy('year', 'month').insertInto(String tableName)
    

    获取所有分区:

    Spark sql 基于 Hive 查询语言,因此您可以使用 SHOW PARTITIONS 获取特定表中的分区列表。

    sparkSession.sql("SHOW PARTITIONS partitionedHiveTable")
    

    只需确保您在使用 SparkSessionBuilder 创建会话时拥有.enableHiveSupport(),并确保您是否正确配置了hive-conf.xml etc.

    【讨论】:

    • 假设我为同一个模式调用了两次partitionBy()。火花是否足够聪明地认识到他可以避免这个操作,即假设我有你提到的数据并且我想在我的数据框中添加“year=2017/month=01”,火花是否足够聪明,可以识别出他没有必须从year2016/month=1 洗牌/加载数据?
    • show partitions my_table 命令没有给我hadoop文件系统上文件的位置/路径。
    • @DavidH 当您有一个带有year 2017month 01 的数据框并将这些数据写入表中时,spark 将创建此分区并存储新数据,而无需从year2016/month=1 加载数据。我没有得到你的第二条评论。当您调用show partitions my_table 并且您启用了hivesupport 时,spark sql 应该显示列表,例如:yearX/monthY 用于分区my_table
    • 查看我的编辑。您确定完全外连接足够聪明,不会加载所有表吗?如果是这样,它解决了问题
    • @DavidH 如果我理解正确你想要什么 - df.write.partitionBy(...).mode(Overwrite).saveAsTable("my_table") 没问题。我猜你需要什么 - 仅覆盖数据框中的数据并保留其他记录而不进行修改。正确的?如果是这样,AFAIK 就不那么容易了,而且 spark 不是为此目的而设计的,而是为了分析而设计的。您可以做些什么来避免 FULL OUTER JOIN 和重写所有表是删除选定的分区并在写入阶段后用您的数据加载新分区。
    【解决方案2】:

    在使用scala的spark中,我们可以使用目录来获取分区:

    spark.catalog.listColumns(<databasename>, <tablename>)
    .filter($"isPartition" === true)
    .select($"name").collect()
    

    【讨论】:

    • 这只会给出分区列名而不是值本身。
    猜你喜欢
    • 1970-01-01
    • 2019-10-27
    • 1970-01-01
    • 1970-01-01
    • 2015-06-12
    • 2014-12-05
    • 2018-12-28
    • 1970-01-01
    • 2016-07-15
    相关资源
    最近更新 更多