【问题标题】:Create UDAF(not UDTF) in Snowflake在雪花中创建 UDAF(不是 UDTF)
【发布时间】:2021-07-27 12:24:25
【问题描述】:

Java UDFs return a scalar result. Java UDTFs are not currently supported.reference

也就是说,我创建了一个 Java UDF,如下所示

CREATE OR replace function MAP_COUNT(colValue String)
returns OBJECT 
language java
handler='Frequency.calculate'
target_path='@~/Frequency.jar'
as
$$
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Optional;
    class Frequency {
        Map<String, Integer> frequencies = new HashMap<>();

        public Map<String, Integer> calculate(String colValue) {
            frequencies.putIfAbsent(colValue, 0);
            frequencies.computeIfPresent(colValue, (key, value) -> value + 1);
            return frequencies;
        }
    }
$$;

在如下查询中使用MAP_COUNT UDF

with temp_1 as
(
    SELECT 'John' AS my_col, 27 as age
    UNION ALL
    SELECT 'John' AS my_col, 28 as age
    UNION ALL
    SELECT 'doe' AS my_col, 27 as age
    UNION ALL
    SELECT 'doe' AS my_col, 28 as age
)
select  MAP_COUNT(a.my_col) from temp_1 a;

我得到如下结果

|MAP_COUNT(A.MY_COL)            |
|-------------------------------|
|{  "John": "1" }               |
|{  "John": "2" }               |
|{ "John": "2",  "doe": "1" }   |
|{  "John": "2",  "doe": "2"}   |

我对 UDF 的期望结果如下

|MAP_COUNT(A.MY_COL)            |
|-------------------------------|
|{  "John": "2",  "doe": "2"}   |

雪花有可能吗?

如果我有如下查询怎么办?

with temp_1 as
(
    SELECT 'John' AS my_col, 27 as age
    UNION ALL
    SELECT 'John' AS my_col, 28 as age
    UNION ALL
    SELECT 'doe' AS my_col, 27 as age
    UNION ALL
    SELECT 'doe' AS my_col, 28 as age
)
select  MAP_COUNT(a.my_col) as names, MAP_COUNT(a.age) as ages  from temp_1 a;

我对 UDF 的期望结果如下

|names                          ||AGES                           |
|-------------------------------||-------------------------------|
|{  "John": "2",  "doe": "2"}   ||{  "27": "2",  "28": "2"}      |

有一些方法可以通过简单地重组查询来实现这一点,但我想知道是否可以通过在 select 子句中使用类似于OBJECT_AGG 函数的MAP_COUNT 函数来实现。

【问题讨论】:

  • Snowflake Ideas - 有一个叫做:“功能请求:存储的聚合函数”

标签: snowflake-cloud-data-platform user-defined-functions


【解决方案1】:

当您运行使用 UDF 的查询时,并非所有行都必须转到 UDF 的同一个实例。例如,假设您正在从表格中进行选择,然后您这样做:

SELECT MyUdf(x) FROM T

这里T可能有多个micro-partitions,其执行方式其实类似于:

SELECT MyUdf(x) FROM T_part1 UNION ALL
SELECT MyUdf(x) FROM T_part2 UNION ALL
SELECT MyUdf(x) FROM T_part3 UNION ALL
SELECT MyUdf(x) FROM T_part4

这里有四个单独的 MyUdf 实例,每个实例都只能看到来自整个 T 的行的一个子集。

回到您的示例,您正在尝试模拟用户定义的聚合函数,其中 UDF 的特定实例可以看到每一行。保证这一点的方法是提前聚合,例如:

CREATE OR replace function MAP_COUNT(colValue array)
returns OBJECT 
language java
handler='Frequency.calculate'
target_path='@~/Frequency.jar'
as
$$
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Optional;
    class Frequency {
        public Map<String, Integer> calculate(String[] colValues) {
            Map<String, Integer> frequencies = new HashMap<>();
            for (String colValue : colValues) {
                frequencies.putIfAbsent(colValue, 0);
                frequencies.computeIfPresent(colValue, (key, value) -> value + 1);
            }
            return frequencies;
        }
    }
$$;

(请注意,我将 UDF 和方法签名更改为分别使用 arrayString[]。)现在在查询中使用它:

with temp_1 as
(
    SELECT 'John' AS my_col, 27 as age
    UNION ALL
    SELECT 'John' AS my_col, 28 as age
    UNION ALL
    SELECT 'doe' AS my_col, 27 as age
    UNION ALL
    SELECT 'doe' AS my_col, 28 as age
)
select
  MAP_COUNT(ARRAY_AGG(a.my_col)) as names,
  MAP_COUNT(ARRAY_AGG(a.age)) as ages
from temp_1 a;

这给了我:

names                        ages
{ "John": "2", "doe": "2" }  { "27": "2", "28": "2" }

这里还有两个问题,特别是:

  • 这不能很好地扩展。如果任一数组的大小超过 16MB(最大值大小),则查询将失败。
  • 语法很笨拙。理想情况下,您只需像使用任何其他聚合函数一样使用 UDF,而不必将输入包装在 ARRAY_AGG 中。

好消息是,一旦 Java UDAF 在未来某个时间可用,这两个问题都将得到解决。

【讨论】:

    【解决方案2】:

    您现在可以使用 Javascript UDTF 实现大部分所需的功能。特别是,可以将 UDTF 配置为仅在“分组”结束时返回(聚合)结果。这是一个例子:

    CREATE OR REPLACE FUNCTION MAP_COUNT(COLVALUE varchar)
      RETURNS TABLE (FREQUENCIES variant)
      LANGUAGE JAVASCRIPT
      AS '
      {
        initialize: function (argumentInfo, context) {
            this.freq = new Map();
        },
    
        processRow: function (row, rowWriter, context) {
            freqVal = this.freq[row.COLVALUE];
            this.freq[row.COLVALUE] = (freqVal == undefined ? 1 : 1 + freqVal);
        },
    
        finalize: function (rowWriter, context) {
            rowWriter.writeRow({FREQUENCIES: this.freq});
        }
    }
    '
    ;
    

    创建一个临时表来测试它:

    create or replace temporary table mytemp as
    with temp_1 as
    (
        SELECT 'John' AS my_col, 27 as age
        UNION ALL
        SELECT 'John' AS my_col, 28 as age
        UNION ALL
        SELECT 'doe' AS my_col, 27 as age
        UNION ALL
        SELECT 'doe' AS my_col, 28 as age
    )
    select * from temp_1;
    

    将 UDTF 作为单个分区结果运行:

    select agg.* from myTemp,
    table(map_count(my_col)) agg
    ;
        Result:  { "John": 2, "doe": 2 }
    

    运行按单独分组划分的 UDTF:

    select my_col, agg.* from myTemp,
    table(map_count(age::varchar) over (partition by my_col)) agg
    ;
        Result:
        doe   { "27": 1, "28": 1 }
        John  { "27": 1, "28": 1 }
    

    【讨论】:

    • 请注意我需要如下结果:`名称:{“John”:“2”,“doe”:“2”} [col-1]年龄:{“27”:“2 ", "28": "2" } [col-2] `
    猜你喜欢
    • 1970-01-01
    • 2020-07-14
    • 2023-03-16
    • 2022-11-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-29
    相关资源
    最近更新 更多