【发布时间】:2021-10-19 17:47:29
【问题描述】:
我将我的 Postgres 数据库迁移到 Snowflake,但我很难将 pgSQL 函数转换为 Snowflake UDF。
在发布脚本之前,这里是我的函数应该做什么的摘要。 该函数使用我数据库中的三个表:
-- events
CREATE TABLE IF NOT EXISTS events (
id bigint NOT NULL autoincrement start 1 increment 1 PRIMARY KEY,
odb_created_at timestamp without time zone NOT NULL,
event_time timestamp without time zone NOT NULL,
device_type integer NOT NULL,
event_type integer NOT NULL,
ticket_type integer NOT NULL,
card_nr character varying(100),
count integer DEFAULT 1 NOT NULL,
manufacturer character varying(200),
carpark_id bigint
);
-- durations
CREATE TABLE IF NOT EXISTS durations (
id bigint NOT NULL autoincrement start 1 increment 1 PRIMARY KEY,
odb_created_at timestamp without time zone NOT NULL,
event_id_arrival bigint,
event_id_departure bigint,
event_time_arrival timestamp without time zone,
event_time_departure timestamp without time zone,
card_nr character varying(100),
ticket_type integer,
duration integer,
manufacturer character varying(200),
carpark_id bigint
);
--properties
create or replace TABLE PROPERTIES (
PROP_KEY VARCHAR(80) NOT NULL,
PROP_VALUE VARCHAR(250),
primary key (PROP_KEY)
);
事件表重新组合所有可能是进入或退出的事件。 持续时间表重新组合与事件相同的信息,但此外,它包含进入和退出之间的持续时间。 属性表用作新计算的参考。
已经计算过的条目不能再计算了,这就是为什么我们将事件表中的id插入为:
- event_id_arrival 如果它是一个条目(device_type 1)
- event_id_departure 如果是出口(设备类型 2)
以下是一些用于重新创建表格并填充表格的示例数据:
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160996, '2021-10-02 04:28:26.338', '2021-10-01 09:14:41.32', 1, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160790, '2021-10-02 04:28:26.248', '2021-10-01 09:31:10.94', 2, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188146489, '2021-10-02 04:26:55.069', '2021-10-01 10:03:01.57', 1, 2, 500, '01479804030429500089598', 1, 'XX', 1563);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188146069, '2021-10-02 04:26:54.852', '2021-10-01 11:49:58.45', 2, 2, 500, '01479804030429500089598', 1, 'XX', 1563);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188161161, '2021-10-02 04:28:26.372', '2021-10-01 18:44:33.62', 1, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160950, '2021-10-02 04:28:26.329', '2021-10-01 18:45:51.903', 2, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188161227, '2021-10-02 04:28:26.374', '2021-10-01 23:21:18.58', 1, 2, 11, '04139733030897300003136', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160974, '2021-10-02 04:28:26.334', '2021-10-01 23:24:03.29', 2, 2, 11, '04139733030897300003136', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188239864, '2021-10-03 04:24:43.345', '2021-10-02 06:49:55.97', 1, 2, 11, '01719400030897300061410', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188239649, '2021-10-03 04:24:43.308', '2021-10-02 07:02:08.72', 2, 2, 11, '01719400030897300061410', 1, 'XX', 1852);
一旦这些事件被填充,我们运行函数并在持续时间表中获取以下值。该函数找到匹配的条目并计算持续时间。之后,它将它们插入到持续时间表中并更新了属性表。
功能
我的函数在 events 表中搜索具有相同 card_nr 且仅属于 device_type 1(= entry)和 device_type 2(= exit)的行。一旦找到匹配项,它就会计算第一个条目和第二个条目之间的持续时间,并将其插入到表 duration 中。
待办事项:
- 在 events 表中查找两个 card_nr 和 device_type 1 和 device_type 2 相同的事件
- 计算条目 1 和条目 2 之间的持续时间,将其插入到 duration 表中。
- 然后将属性表用作新导入的参考。
在图片中:
函数如下:
CREATE OR REPLACE FUNCTION public.calculateduration()
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
arrived_entryqwr RECORD;
departed_entry RECORD;
durationLimitDays INTEGER := -1;
durationLimitDate TIMESTAMP := '1970-01-01 00:00:00';
cursorQuery text;
cursorEvent refcursor;
maxEventTime TIMESTAMP;
BEGIN
-- DELETE FROM durations;
-- start date = DURATION.LIMIT.DATE
-- end date = now - DURATION.LIMIT.DAYS
SELECT PROP_VALUE INTO durationLimitDays FROM properties WHERE prop_key = 'DURATION.LIMIT.DAYS';
SELECT PROP_VALUE INTO durationLimitDate FROM properties WHERE prop_key = 'DURATION.LIMIT.DATE';
RAISE NOTICE 'Parameter duration limit days ''%'' , duration limit date ''%''', durationLimitDays, durationLimitDate;
-- AND e.event_time < to_char(now(), ''YYYY-MM-DD'')::date - (''42 month''::interval)
cursorQuery:='SELECT e.id, e.card_nr, e.event_time, e.ticket_type, e.device_type, e.manufacturer, e.carpark_id
FROM events e
WHERE e.event_time >= ''' || durationLimitDate || '''
AND e.device_type IN (1,2) AND event_type=2
AND e.manufacturer like ''XX''
AND NOT EXISTS (SELECT d.event_id_arrival FROM durations d WHERE d.event_id_arrival = e.id)
AND NOT EXISTS (SELECT d.event_id_departure FROM durations d WHERE d.event_id_departure = e.id)
ORDER BY e.card_nr, e.event_time, e.carpark_id';
-- AND e.event_time < ''2016-01-05 00:00:00''
OPEN cursorEvent SCROLL FOR EXECUTE cursorQuery;
LOOP
FETCH cursorEvent INTO arrived_entry;
EXIT WHEN NOT FOUND;
IF arrived_entry.device_type=1 THEN
FETCH cursorEvent INTO departed_entry;
EXIT WHEN NOT FOUND;
-- same card number and same car park
IF arrived_entry.card_nr=departed_entry.card_nr AND arrived_entry.carpark_id=departed_entry.carpark_id THEN
IF departed_entry.device_type=2 THEN
EXECUTE 'INSERT INTO durations VALUES (nextval(''durations_id_seq''), ''' || current_timestamp || ''',' || arrived_entry.id || ',' || departed_entry.id || ',''' || arrived_entry.event_time || ''',''' || departed_entry.event_time
|| ''',''' || arrived_entry.card_nr || ''',' || arrived_entry.ticket_type || ','
|| date_part('epoch', departed_entry.event_time::timestamp - arrived_entry.event_time::timestamp) || ', ''' || arrived_entry.manufacturer || ''','
|| arrived_entry.carpark_id || ')';
ELSE
-- repeated entry found - refresh entry with the repeated one
-- RAISE NOTICE 'Unexpected entry after entry found at event id ''%'' and card number''%''', arrived_entry.id, arrived_entry.card_nr;
FETCH PRIOR FROM cursorEvent INTO arrived_entry;
END IF;
ELSE
-- card number or car park changed - refresh to changed card number / car park
-- RAISE NOTICE 'Unexpected card number or car park change found at event id ''%'' and card number''%''', arrived_entry.id, arrived_entry.card_nr;
FETCH PRIOR FROM cursorEvent INTO arrived_entry;
END IF;
END IF;
END LOOP;
CLOSE cursorEvent;
-- update duration limit date ( = MAX(event_time) - durationLimitDays)
EXECUTE 'SELECT MAX(event_time) FROM events WHERE event_time >= ''' || durationLimitDate || '''' INTO maxEventTime;
SELECT (maxEventTime - (durationLimitDays ||' day')::interval) INTO durationLimitDate;
EXECUTE 'UPDATE properties SET PROP_VALUE=''' || durationLimitDate || ''' WHERE prop_key =''DURATION.LIMIT.DATE''';
RAISE NOTICE 'Add new DURATION.LIMIT.DATE to ''%''', durationLimitDate;
END;
$function$
;
这是一个尝试复制它或至少部分复制它的尝试:
WITH x AS (select *
from events
match_recognize (
partition by card_nr
order by event_time
measures match_number() as match_n
, first(id) as event_id_arrival
, last(id) as event_id_departure
, first(event_time) as event_time_arrival
, last(event_time) as event_time_departure
, timestampdiff('second', first(event_time), last(event_time)) as duration
, TICKET_TYPE AS TICKET_TYPE
, MANUFACTURER AS MANUFACTURER
, CARPARK_ID AS CARPARK_ID
, ODB_CREATED_AT AS ODB_CREATED_AT
, id AS id
one row per match
pattern(arr dep)
define arr as device_type=1
, dep as device_type=2
))
SELECT durationseq.nextval as id, ODB_CREATED_AT, event_id_arrival, event_id_departure, event_time_arrival, event_time_departure, CARD_NR, TICKET_TYPE, duration, MANUFACTURER,
CARPARK_ID FROM x
我还需要介绍几件事:
- 我应该过滤制造商始终为“XX”,因为有像 YY 或 ZZ 这样的制造商
- 在我们的选择中,我们应该只计算来自属性的 events.event_time >) durationLimitDate 的持续时间
- 我需要确保 event_id_arrival 和 event_id_departure 不在目标表中以避免重复
- 将计算插入持续时间表后,我必须更新属性中的 durationLimitDate。知道durationLimitDate = (Max(event_time) - durationLimitDays))
【问题讨论】:
-
您能帮助我们创建示例数据和期望的结果,以确保我们能够正确解决这个问题吗?
-
嗨@FelipeHoffa 我刚刚编辑了我的帖子并添加了预期输入和输出的数据
标签: postgresql snowflake-cloud-data-platform plpgsql