【问题标题】:Snowflake Tasks - Visualizing hierarchy雪花任务 - 可视化层次结构
【发布时间】:2023-12-11 23:36:01
【问题描述】:

Snowflake 允许使用AFTER 语法创建任务链。

CREATE TASK

AFTER string

指定当前任务的前置任务。当前导任务的运行成功完成时,它会触发此任务(在短暂的延迟之后)。

这个参数可以定义一个简单的任务树;即由它们的依赖关系组织的一组任务。在这种情况下,树是一系列任务,它们以计划的根任务开始,并通过它们的依赖关系链接在一起。

假设我们有以下内容:

CREATE DATABASE TEST;
CREATE WAREHOUSE Developer WITH WAREHOUSE_SIZE = 'XSMALL' 
       WAREHOUSE_TYPE = 'STANDARD';
CREATE SCHEMA TEST;

CREATE OR REPLACE TASK task1 WAREHOUSE = Developer SCHEDULE = '10 minute'
AS SELECT system$wait(20);

CREATE OR REPLACE TASK task2 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(30);

CREATE OR REPLACE TASK task3 WAREHOUSE = Developer AFTER task2
AS SELECT system$wait(60);

CREATE OR REPLACE TASK task4 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(20);

CREATE OR REPLACE TASK task5 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(30);

CREATE OR REPLACE TASK task6 WAREHOUSE = Developer AFTER task3
AS SELECT system$wait(40);

CREATE OR REPLACE TASK task7 WAREHOUSE = Developer AFTER task5
AS SELECT system$wait(50);

CREATE OR REPLACE TASK task8 WAREHOUSE = Developer AFTER task5
AS SELECT system$wait(30);

目标是获得任务的图形表示,以便快速概览或记录。

【问题讨论】:

    标签: snowflake-cloud-data-platform recursive-query


    【解决方案1】:

    Snowflake 支持:TASK_DEPENDENTS 表函数:

    此表函数返回简单任务树中给定根(即父)任务的子任务列表。

    SELECT CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME, PREDECESSOR
    FROM TABLE(INFORMATION_SCHEMA.TASK_DEPENDENTS(TASK_NAME => 'task1',
                                                  RECURSIVE => TRUE ));
    /*
    TASK_NAME        PREDECESSOR
    TEST.TEST.TASK1 
    TEST.TEST.TASK2 TEST.TEST.TASK1
    TEST.TEST.TASK4 TEST.TEST.TASK1
    TEST.TEST.TASK5 TEST.TEST.TASK1
    TEST.TEST.TASK3 TEST.TEST.TASK2
    TEST.TEST.TASK7 TEST.TEST.TASK5
    TEST.TEST.TASK8 TEST.TEST.TASK5
    TEST.TEST.TASK6 TEST.TEST.TASK3
    */
    

    利用“Diagrams as Code”Mermaid的思想,我们可以生成如下流程图:

    WITH RECURSIVE cte AS (
       SELECT CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME, *
       FROM TABLE(INFORMATION_SCHEMA.TASK_DEPENDENTS(
                       TASK_NAME => 'task1', RECURSIVE => TRUE )) 
                                 -- here goes task name
    ), rec AS (
       SELECT 
          0 AS lvl, cte.TASK_NAME, cte.PREDECESSOR,
          REPLACE(REPLACE(REPLACE(
            'ROOT{.} -- "SCHEDULE: <schedule>;CONDITION: <condition>" --> <root>'
           ,'<schedule>', COALESCE(cte.SCHEDULE, '<none>'))
           ,'<condition>', COALESCE(cte.CONDITION,'<none>'))
           ,'<root>', cte.TASK_NAME)  AS GRAPH_ENTRY
       FROM cte 
       WHERE PREDECESSOR IS NULL
       UNION ALL
       SELECT rec.lvl + 1 AS lvl, cte.TASK_NAME, cte.PREDECESSOR,
              REPLACE(REPLACE('<T1> --> <T2>'
              ,'<T1>', cte.PREDECESSOR)
              ,'<T2>', cte.TASK_NAME) AS GRAPH_ENTRY
       FROM rec
       JOIN cte ON rec.TASK_NAME = cte.PREDECESSOR
    )
    SELECT 'graph TD' || CHAR(13) || 
           LISTAGG(CHAR(9) || GRAPH_ENTRY || CHAR(13), '') 
                   WITHIN GROUP(ORDER BY lvl) AS flow_chart
    FROM rec;
    

    我们将得到以下输出:

    graph TD
        ROOT{.} -- "SCHEDULE: 10 minute;CONDITION: <none>" --> TEST.TEST.TASK1
        TEST.TEST.TASK1 --> TEST.TEST.TASK2
        TEST.TEST.TASK1 --> TEST.TEST.TASK4
        TEST.TEST.TASK1 --> TEST.TEST.TASK5
        TEST.TEST.TASK2 --> TEST.TEST.TASK3
        TEST.TEST.TASK5 --> TEST.TEST.TASK7
        TEST.TEST.TASK5 --> TEST.TEST.TASK8
        TEST.TEST.TASK3 --> TEST.TEST.TASK6
    

    可以使用Mermaid-live-editor进行可视化:

    Mermaid Flow chart - LiveDemo


    附加:它也可用于使用甘特图可视化执行历史:

    启用所有任务:

    ALTER TASK TEST.TEST.TASK8 RESUME;
    ALTER TASK TEST.TEST.TASK7 RESUME;
    ALTER TASK TEST.TEST.TASK6 RESUME;
    ALTER TASK TEST.TEST.TASK5 RESUME;
    ALTER TASK TEST.TEST.TASK4 RESUME;
    ALTER TASK TEST.TEST.TASK3 RESUME;
    ALTER TASK TEST.TEST.TASK2 RESUME;
    ALTER TASK TEST.TEST.TASK1 RESUME;
    SHOW TASKS;
    

    生成甘特图:

    SELECT 
      CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME,
      QUERY_START_TIME,
      COMPLETED_TIME,
      DATEDIFF(SECOND, QUERY_START_TIME,  COMPLETED_TIME) AS DURATION_SEC,
      TASK_NAME || ':' || TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH:MI:SS') 
                || ',' || DURATION_SEC || 's' AS GRAPH_ENTRY,
      s.gantt || LISTAGG(CHAR(9) || GRAPH_ENTRY || CHAR(13), '') 
             WITHIN GROUP(ORDER BY QUERY_START_TIME) OVER() AS graph
    FROM TABLE(information_schema.task_history(
               scheduled_time_range_start=>'2021-05-16 07:00:00.000'::TIMESTAMP_LTZ))
    ,LATERAL(SELECT REPLACE(
    'gantt
        title Task execution
        dateFormat YYYY-MM-DD HH:mm:ss
        axisFormat  %Y-%m-%d %H:%M
        section RunId=<run_id>
    '
    ,'<run_id>'
    ,RUN_ID)
    ) s(gantt)
    WHERE STATE = 'SUCCEEDED'
      --AND RUN_ID = x
    ORDER BY scheduled_time;
    

    输出:

    gantt
        title Task execution
        dateFormat YYYY-MM-DD HH:mm:ss
        axisFormat  %Y-%m-%d %H:%M
        section RunId=xxxxxx
        TEST.TEST.TASK1:2021-05-16 07:13:45,20s
        TEST.TEST.TASK5:2021-05-16 07:14:06,31s
        TEST.TEST.TASK4:2021-05-16 07:14:09,21s
        TEST.TEST.TASK2:2021-05-16 07:14:15,30s
        TEST.TEST.TASK8:2021-05-16 07:14:51,34s
        TEST.TEST.TASK7:2021-05-16 07:14:51,50s
        TEST.TEST.TASK3:2021-05-16 07:15:01,60s
        TEST.TEST.TASK6:2021-05-16 07:16:15,40s
    

    Mermaid Gantt diagram Live Demo

    【讨论】: