【问题标题】:Translate pgSQL Function to Snowflake将 pgSQL 函数转换为雪花
【发布时间】:2021-10-19 17:47:29
【问题描述】:

我将我的 Postgres 数据库迁移到 Snowflake,但我很难将 pgSQL 函数转换为 Snowflake UDF。

在发布脚本之前,这里是我的函数应该做什么的摘要。 该函数使用我数据库中的三个表:

 -- events
    CREATE TABLE IF NOT EXISTS events (
    id bigint NOT NULL autoincrement start 1 increment 1 PRIMARY KEY,
    odb_created_at timestamp without time zone NOT NULL,
    event_time timestamp without time zone NOT NULL,
    device_type integer NOT NULL,
    event_type integer NOT NULL,
    ticket_type integer NOT NULL,
    card_nr character varying(100),
    count integer DEFAULT 1 NOT NULL,
    manufacturer character varying(200),
    carpark_id bigint
); 

 -- durations
CREATE TABLE IF NOT EXISTS durations (
    id bigint NOT NULL autoincrement start 1 increment 1 PRIMARY KEY,
    odb_created_at timestamp without time zone NOT NULL,
    event_id_arrival bigint,
    event_id_departure bigint,
    event_time_arrival timestamp without time zone,
    event_time_departure timestamp without time zone,
    card_nr character varying(100),
    ticket_type integer,
    duration integer,
    manufacturer character varying(200),
    carpark_id bigint
);

--properties
create or replace TABLE PROPERTIES (
    PROP_KEY VARCHAR(80) NOT NULL,
    PROP_VALUE VARCHAR(250),
    primary key (PROP_KEY)
);

事件表重新组合所有可能是进入或退出的事件。 持续时间表重新组合与事件相同的信息,但此外,它包含进入和退出之间的持续时间。 属性表用作新计算的参考。

已经计算过的条目不能再计算了,这就是为什么我们将事件表中的id插入为:

  • event_id_arrival 如果它是一个条目(device_type 1)
  • event_id_departure 如果是出口(设备类型 2)

以下是一些用于重新创建表格并填充表格的示例数据:

活动

INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160996, '2021-10-02 04:28:26.338', '2021-10-01 09:14:41.32', 1, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160790, '2021-10-02 04:28:26.248', '2021-10-01 09:31:10.94', 2, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188146489, '2021-10-02 04:26:55.069', '2021-10-01 10:03:01.57', 1, 2, 500, '01479804030429500089598', 1, 'XX', 1563);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188146069, '2021-10-02 04:26:54.852', '2021-10-01 11:49:58.45', 2, 2, 500, '01479804030429500089598', 1, 'XX', 1563);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188161161, '2021-10-02 04:28:26.372', '2021-10-01 18:44:33.62', 1, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160950, '2021-10-02 04:28:26.329', '2021-10-01 18:45:51.903', 2, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188161227, '2021-10-02 04:28:26.374', '2021-10-01 23:21:18.58', 1, 2, 11, '04139733030897300003136', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160974, '2021-10-02 04:28:26.334', '2021-10-01 23:24:03.29', 2, 2, 11, '04139733030897300003136', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188239864, '2021-10-03 04:24:43.345', '2021-10-02 06:49:55.97', 1, 2, 11, '01719400030897300061410', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188239649, '2021-10-03 04:24:43.308', '2021-10-02 07:02:08.72', 2, 2, 11, '01719400030897300061410', 1, 'XX', 1852);

一旦这些事件被填充,我们运行函数并在持续时间表中获取以下值。该函数找到匹配的条目并计算持续时间。之后,它将它们插入到持续时间表中并更新了属性表。

功能

我的函数在 events 表中搜索具有相同 card_nr 且仅属于 device_type 1(= entry)和 device_type 2(= exit)的行。一旦找到匹配项,它就会计算第一个条目和第二个条目之间的持续时间,并将其插入到表 duration 中。

待办事项:

  1. events 表中查找两个 card_nr 和 device_type 1 和 device_type 2 相同的事件
  2. 计算条目 1 和条目 2 之间的持续时间,将其插入到 duration 表中。
  3. 然后将属性表用作新导入的参考。

在图片中:

  1. 活动

  2. 函数运行,持续时间结果

  3. 属性

函数如下:

    CREATE OR REPLACE FUNCTION public.calculateduration()
 RETURNS void
 LANGUAGE plpgsql
AS $function$
DECLARE

     arrived_entryqwr RECORD;
     departed_entry RECORD;  

     durationLimitDays INTEGER := -1;
     durationLimitDate TIMESTAMP := '1970-01-01 00:00:00';
     
     cursorQuery text;
     cursorEvent refcursor;
     
     maxEventTime TIMESTAMP;
     
BEGIN
    -- DELETE FROM durations;

    -- start date   = DURATION.LIMIT.DATE
    -- end date     = now - DURATION.LIMIT.DAYS
    SELECT PROP_VALUE INTO durationLimitDays FROM properties WHERE prop_key = 'DURATION.LIMIT.DAYS';
    SELECT PROP_VALUE INTO durationLimitDate FROM properties WHERE prop_key = 'DURATION.LIMIT.DATE';
    RAISE NOTICE 'Parameter duration limit days ''%'' , duration limit date ''%''', durationLimitDays, durationLimitDate;
    
    --  AND e.event_time < to_char(now(), ''YYYY-MM-DD'')::date - (''42 month''::interval)
    cursorQuery:='SELECT e.id, e.card_nr, e.event_time, e.ticket_type, e.device_type, e.manufacturer, e.carpark_id
                  FROM events e 
                  WHERE e.event_time >= ''' || durationLimitDate || '''
                  AND e.device_type IN (1,2) AND event_type=2
                  AND e.manufacturer like ''XX''
                  AND NOT EXISTS (SELECT d.event_id_arrival FROM durations d WHERE d.event_id_arrival = e.id)
                  AND NOT EXISTS (SELECT d.event_id_departure FROM durations d WHERE d.event_id_departure = e.id)
                  ORDER BY e.card_nr, e.event_time, e.carpark_id';
                  -- AND e.event_time < ''2016-01-05 00:00:00'' 
    OPEN cursorEvent SCROLL FOR EXECUTE cursorQuery;

    LOOP
        FETCH cursorEvent INTO arrived_entry;
        EXIT WHEN NOT FOUND;
        IF arrived_entry.device_type=1 THEN
            FETCH cursorEvent INTO departed_entry;
            EXIT WHEN NOT FOUND;
            
            -- same card number and same car park
            IF arrived_entry.card_nr=departed_entry.card_nr AND arrived_entry.carpark_id=departed_entry.carpark_id THEN
                IF departed_entry.device_type=2 THEN
                    EXECUTE 'INSERT INTO durations VALUES (nextval(''durations_id_seq''), ''' || current_timestamp || ''',' || arrived_entry.id || ',' || departed_entry.id || ',''' || arrived_entry.event_time || ''',''' || departed_entry.event_time 
                                                              || ''',''' || arrived_entry.card_nr || ''',' || arrived_entry.ticket_type || ',' 
                                                              || date_part('epoch', departed_entry.event_time::timestamp - arrived_entry.event_time::timestamp) || ', ''' || arrived_entry.manufacturer || ''',' 
                                                              || arrived_entry.carpark_id || ')';
                ELSE 
                    -- repeated entry found - refresh entry with the repeated one 
                    -- RAISE NOTICE 'Unexpected entry after entry found at event id ''%'' and card number''%''', arrived_entry.id, arrived_entry.card_nr;
                    FETCH PRIOR FROM cursorEvent INTO arrived_entry;
                END IF;
            ELSE
                -- card number or car park changed - refresh to changed card number / car park
                -- RAISE NOTICE 'Unexpected card number or car park change found at event id ''%'' and card number''%''', arrived_entry.id, arrived_entry.card_nr;
                FETCH PRIOR FROM cursorEvent INTO arrived_entry;
            END IF;
        END IF;
    END LOOP;
    CLOSE cursorEvent;
    
    -- update duration limit date ( = MAX(event_time) - durationLimitDays)
    EXECUTE 'SELECT MAX(event_time) FROM events WHERE event_time >= ''' || durationLimitDate || '''' INTO maxEventTime;
    SELECT (maxEventTime - (durationLimitDays ||' day')::interval) INTO durationLimitDate;
    EXECUTE 'UPDATE properties SET PROP_VALUE=''' || durationLimitDate || ''' WHERE prop_key =''DURATION.LIMIT.DATE''';
    RAISE NOTICE 'Add new DURATION.LIMIT.DATE to ''%''', durationLimitDate;
        
END;
$function$
;

这是一个尝试复制它或至少部分复制它的尝试:

WITH x AS (select *
from events
match_recognize (
    partition by card_nr
    order by event_time
    measures match_number() as match_n
        , first(id) as event_id_arrival
        , last(id) as event_id_departure
        , first(event_time) as event_time_arrival
        , last(event_time) as event_time_departure
        , timestampdiff('second', first(event_time), last(event_time)) as duration
        , TICKET_TYPE AS TICKET_TYPE 
        , MANUFACTURER AS MANUFACTURER 
        , CARPARK_ID AS CARPARK_ID 
        , ODB_CREATED_AT AS ODB_CREATED_AT 
        , id AS id
    one row per match
    pattern(arr dep)
    define arr as device_type=1
        , dep as device_type=2
))
SELECT durationseq.nextval as id, ODB_CREATED_AT, event_id_arrival, event_id_departure, event_time_arrival, event_time_departure, CARD_NR, TICKET_TYPE, duration, MANUFACTURER, 
CARPARK_ID FROM x

我还需要介绍几件事:

  1. 我应该过滤制造商始终为“XX”,因为有像 YY 或 ZZ 这样的制造商
  2. 在我们的选择中,我们应该只计算来自属性的 events.event_time >) durationLimitDate 的持续时间
  3. 我需要确保 event_id_arrival 和 event_id_departure 不在目标表中以避免重复
  4. 将计算插入持续时间表后,我必须更新属性中的 durationLimitDate。知道durationLimitDate = (Max(event_time) - durationLimitDays))

【问题讨论】:

  • 您能帮助我们创建示例数据和期望的结果,以确保我们能够正确解决这个问题吗?
  • 嗨@FelipeHoffa 我刚刚编辑了我的帖子并添加了预期输入和输出的数据

标签: postgresql snowflake-cloud-data-platform plpgsql


【解决方案1】:

部分解决方案 - 使用 MATCH_RECOGNIZE 很容易找到这些模式。

我用提供的示例数据进行了尝试,结果看起来不错:

select *
from events
match_recognize (
    partition by card_nr
    order by event_time
    measures match_number() as match_n
        , first(event_time) as event_time_arr
        , last(event_time) as event_time_dep
        , first(id) as event_id_arr
        , last(id) as event_id_dep
        , timestampdiff('second', first(event_time), last(event_time)) as duration
    one row per match
    pattern(arr dep)
    define arr as device_type=1
        , dep as device_type=2
);

您现在只需要根据需要将这些结果插入到正确的目标表中。

【讨论】:

  • 非常感谢您的帮助,我花了一点钱并添加了我需要的列。我需要介绍一些更重要的事情。我已将它们添加到帖子底部以保持 cmets 清洁!
猜你喜欢
  • 2021-08-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-07-12
  • 2021-03-19
  • 2020-10-19
  • 1970-01-01
相关资源
最近更新 更多