【问题标题】:reduce the amount of data scanned by Athena when using aggregate functions在使用聚合函数时减少 Athena 扫描的数据量
【发布时间】:2019-04-26 18:14:48
【问题描述】:

以下查询扫描 100 mb 的数据。

select * from table where column1 = 'val' and partition_id = '20190309';

但是下面的查询扫描了 15 GB 的数据(有超过 90 个分区)

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

如何优化第二个查询以扫描与第一个相同数量的数据?

【问题讨论】:

  • Athena 基于 Presto。在 Presto 中,这将通过动态过滤得到改进 (github.com/prestosql/presto/issues/52)。尽管这种查询案例会从不同的执行路径中受益更多,您可以在其中执行部分查询并重新规划其余部分 (github.com/prestosql/presto/issues/684)。
  • 感谢@PiotrFindeisen。您是否建议必须首先获取最新的分区并将其作为值传递给第二个查询?
  • 目前,是的,我想是的。您可以在该问题下发表评论以详细描述您的用例。

标签: sql presto amazon-athena trino


【解决方案1】:

这里有两个问题。上面select max(partition_id) from table 的标量子查询的效率,以及@PiotrFindeisen 指出的围绕动态过滤的效率。

第一个问题是对 Hive 表的分区键的查询比看起来要复杂得多。大多数人会认为,如果您想要分区键的最大值,您可以简单地对分区键执行查询,但这不起作用,因为 Hive 允许分区为空(并且它还允许非空文件不包含任何行)。具体来说,select max(partition_id) from table 上面的标量子查询需要Trino (formerly PrestoSQL) 来找到包含至少一行的最大分区。理想的解决方案是在 Hive 中拥有完美的统计信息,但如果没有做到这一点,引擎将需要为 Hive 提供自定义逻辑,以打开分区文件,直到找到非空文件。

如果您确定您的仓库不包含空分区(或者您可以接受其中的含义),您可以将标量子查询替换为隐藏的 $partitions 表上的一个“

select * 
from table 
where column1 = 'val' and 
    partition_id = (select max(partition_id) from "table$partitions");

第二个问题是@PiotrFindeisen 指出的,它与查询计划和执行的方式有关。大多数人会看上面的查询,看到引擎显然应该在计划期间计算出select max(partition_id) from "table$partitions" 的值,将其内联到计划中,然后继续优化。不幸的是,一般来说,这是一个相当复杂的决定,因此引擎只是将其建模为广播连接,其中一部分执行计算出该值,并将该值广播给其他工作人员。问题是执行的其余部分无法将此新信息添加到现有处理中,因此它只是扫描所有数据,然后过滤掉您试图跳过的值。有一个项目正在进行中添加此dynamic filtering,但尚未完成。

这意味着您今天能做的最好的事情就是运行两个单独的查询:一个获取最大 partition_id,另一个获取内联值。

顺便说一句,Presto 0.199 中添加了隐藏的“$partitions”表,我们修复了 0.201 中的一些小错误。我不确定 Athena 基于哪个版本,但我相信它已经过时了(我写这个答案时的当前版本是309

【讨论】:

  • 感谢@Dain Sundstrom。我会试试这个。在我的情况下,该表将始终包含 1 个或多个分区。
  • 虽然这是一个很好的答案,它解释了细节以及为什么它不像表面上看起来那么容易,但使用 …$partitions 的建议在 Athena 中不起作用,因为它基于 Presto 0.172。
  • 我能够在this answer 的基础上使用information_schema.__internal_partitions__ 提出一个解决方案来解决您提到的第一个问题。真的很不幸,Athena/Presto 仍然没有解决第二个问题的方法:(
  • 经过更多黑客攻击后,我还能够针对第二个问题提出部分缓解措施,这至少限制了扫描的数据量(作为答案发布在下面)。这对于我的特定用例来说足够好,尽管这可能不适用于所有用例,而且我不能 100% 确定依赖 information_schema.__internal_partitions_ 表的所有后果。
【解决方案2】:

编辑:Presto 在他们的0.193 release 中删除了__internal_partitions__ 表,所以我建议不要在任何生产系统中使用下面Slow aggregation queries for partition keys 部分中定义的解决方案,因为Athena '透明'更新 presto 版本。我最终只使用了幼稚的SELECT max(partition_date) ... 查询,但也使用了Lack of Dynamic Filtering 部分中概述的相同回溯技巧。它比使用__internal_partitions__ 表慢了大约 3 倍,但至少在 Athena 决定更新他们的 presto 版本时它不会中断。

----- 原帖-----

因此,我想出了一个相当老套的方法来为大型数据集上的基于日期的分区完成此任务,因为当您只需要回顾几个分区的数据以匹配最大值时, ,请注意,我不能 100% 确定 information_schema.__internal_partitions__ 表的使用有多脆弱。

正如@Dain 上面提到的,确实有两个问题。第一个是 max(partition_date) 查询的聚合有多慢,第二个是 Presto 缺乏对动态过滤的支持。

分区键的慢速聚合查询

为了解决第一个问题,我使用了information_schema.__internal_partitions__ 表,它允许我快速聚合表的分区,而无需扫描文件中的数据。 (请注意,以下查询中的partition_valuepartition_keypartition_number 都是__internal_partitions__ 表的列名,与您的表的列无关)

如果您的表只有一个分区键,您可以执行以下操作:

SELECT max(partition_value) FROM information_schema.__internal_partitions__
WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'

但如果您有多个分区键,则需要更多类似的东西:

SELECT max(partition_date) as latest_partition_date from (
  SELECT max(case when partition_key = 'partition_date' then partition_value end) as partition_date, max(case when partition_key = 'another_partition_key' then partition_value end) as another_partition_key
  FROM information_schema.__internal_partitions__
  WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'
  GROUP BY partition_number
)
WHERE
  -- ... Filter down by values for e.g. another_partition_key
)

这些查询应该运行得相当快(我的运行大约需要 1-2 秒),而无需扫描文件中的实际数据,但同样,我不确定使用这种方法是否有任何问题。

缺乏动态过滤

对于我的特定用例,我能够减轻第二个问题的最坏影响,因为我希望在从当前日期开始的有限时间内总是有一个分区(例如,我可以保证任何数据-生产或分区加载问题将在 3 天内得到解决)。事实证明,Athena 在使用 presto 的 datetime functions 时确实做了一些预处理,因此这与使用子查询的动态过滤没有相同类型的问题。

因此,您可以更改查询,以限制使用日期时间函数回溯实际最大值的距离,从而限制扫描的数据量。

SELECT * FROM "DATABASE_NAME"."TABLE_NAME"
WHERE partition_date >= cast(date '2019-06-25' - interval '3' day as varchar) -- Will only scan partitions from 3 days before '2019-06-25'
AND partition_date = (
  -- Insert the partition aggregation query from above here
)

【讨论】:

    【解决方案3】:

    我不知道它是否仍然相关,但刚刚发现:

    代替:

    select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);
    

    用途:

    select a.* from table a 
    inner join (select max(partition_id) max_id from table) b on a.partition_id=b.max_id
    where column1 = 'val';
    

    我认为这与优化连接以使用分区有关。

    【讨论】:

    • 谢谢,我试试看!!
    猜你喜欢
    • 1970-01-01
    • 2019-03-19
    • 1970-01-01
    • 2021-12-23
    • 2019-11-11
    • 2020-11-30
    • 1970-01-01
    • 1970-01-01
    • 2019-10-16
    相关资源
    最近更新 更多