【问题标题】:Copy latest file from S3 stage to Snowflake table using COPY command使用 COPY 命令将最新文件从 S3 阶段复制到雪花表
【发布时间】:2021-08-10 12:55:28
【问题描述】:

我有一个按日期划分的 S3 阶段,我希望每天每小时都有一个文件。我只想使用 COPY 命令从 S3 阶段选择最新文件。

如何指定复制命令以仅选择最新文件?我读到雪花将加载元数据的历史记录保留 64 天,以避免加载相同的文件。但我想知道是否有任何方法可以通过 COPY 命令仅选择最新文件。

FILE_FORMAT=(type=csv 
            compression=gzip 
            field_delimiter=','  
            skip_header=1 
            field_optionally_enclosed_by='\"' 
            empty_field_as_null=true 
            NULL_IF = ('NULL','null','') 
            date_format='yyyy-mm-dd' time_format='hh24:mi:ss.ff' 
            )

【问题讨论】:

    标签: amazon-s3 snowflake-cloud-data-platform


    【解决方案1】:

    我认为您将需要一个存储过程来执行此操作。我从我写的一个项目中提取了一些代码以批量加载文件https://github.com/GregPavlik/snowflake_bulk_loader/blob/master/02.%20Bulk%20Load%20-%20Runtime.sql

    第一个挑战是文件的 LIST 命令返回的日期格式与标准时间戳文字格式不同。有一个 UDF 可以从上次修改时间转换为雪花时间戳。第二个挑战是 LIST 命令返回路径中的阶段名称,但方式略有不同,具体取决于它是内部还是外部以及外部取决于云主机。还有另一个 UDF 从路径中去除舞台名称。

    SP 从那里列出文件,获取最新的文件,然后发出复制命令。您可以按照它在做什么,并将适当的点更改为您的阶段名称、表名称和复制命令。

    这假设您使用的是命名阶段(外部阶段被命名为使用 @ 引用的内部阶段)。如果没有,您可以调整以@为前缀的阶段名称的代码。

    create or replace function LAST_MODIFIED_TO_TIMESTAMP(LAST_MODIFIED string) 
    returns timestamp_tz
    as
    $$
        to_timestamp_tz(left(LAST_MODIFIED, len(LAST_MODIFIED) - 4) || ' ' || '00:00', 'DY, DD MON YYYY HH:MI:SS TZH:TZM')
    $$;
    
    
    create or replace function STAGE_PATH_SHORTEN(FILE_PATH string)
    returns string
    language javascript
    as
    $$
        /*
            Removes the cloud provider prefix and stage name from the file path
        */
        var s3 = FILE_PATH.search(/s3:\/\//i);
    
        if ( s3 != -1){
            return FILE_PATH.substring(FILE_PATH.indexOf("/", s3 + 5) + 1);
        }
    
        var azure = FILE_PATH.search(/azure:\/\//i);
    
        if ( azure != -1){
            return FILE_PATH.substring(FILE_PATH.indexOf("/", azure + 8) + 1);
        }
    
        var newStyleInternal = FILE_PATH.search(/stages\//i);
    
        if (newStyleInternal != -1){
            return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal + 7) + 1);
        }
    
        var newStyleInternal = FILE_PATH.search(/stages[a-zA-Z0-9]{4,10}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}\//i);
    
        if (newStyleInternal != -1){
            return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal) + 1);
        }
    
        var stageRegExp = "/";
        var re = new RegExp(stageRegExp, "i");
    
        var stageInStr = FILE_PATH.search(re);
    
        if (stageInStr != -1){
            return FILE_PATH.substring(FILE_PATH.indexOf("/", stageInStr) + 1);
        }
    
        throw "Unknown file path type."
    $$;
    
    
    create or replace procedure INGEST_MOST_RECENT_FILE(STAGE_NAME string, TARGET_TABLE string)
    returns string
    language javascript
    execute as caller
    as
    $$
    
    try{
    
        getResultSet(`list @${STAGE_NAME}`);
        var fileName = executeSingleValueQuery('FILE_NAME', 
                                               `select stage_path_shorten("name") as FILE_NAME, LAST_MODIFIED_TO_TIMESTAMP("last_modified") as LAST_MODIFIED_TS 
                                                from table(result_scan(last_query_id())) order by LAST_MODIFIED_TS desc limit 1;`);
        
        // Modify with your COPY INTO statement here, but leave the last part -- files=('${fileName}') -- unmodified.
        var copyStatus = executeSingleValueQuery("status",
    
    `
    copy into ${TARGET_TABLE} from @${STAGE_NAME} file_format=(type=CSV) files=('${fileName}') ;
    `
    
        );
        
        return `Copy status: ${copyStatus}.`;
    
    } catch (err) {
        return `Error: ${err.message}.`;
    }
    
    function getResultSet(sql){
        cmd = {sqlText: sql};
        stmt = snowflake.createStatement(cmd);
        var rs;
        rs = stmt.execute();
        return rs;
    }
    
    function executeSingleValueQuery(columnName, queryString) {
        var out;
        cmd1 = {sqlText: queryString};
        stmt = snowflake.createStatement(cmd1);
        var rs;
        try{
            rs = stmt.execute();
            rs.next();
            return rs.getColumnValue(columnName);
        }
        catch(err) {
            if (err.message.substring(0, 18) == "ResultSet is empty"){
                throw "ERROR: No rows returned in query.";
            } else {
                throw "ERROR: " + err.message.replace(/\n/g, " ");
            } 
        }
        return out;
    }
    
    $$;
    
    call ingest_most_recent_file('TEST_STAGE', 'TARGET_TABLE');
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-28
      • 2020-10-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多