我认为您将需要一个存储过程来执行此操作。我从我写的一个项目中提取了一些代码以批量加载文件https://github.com/GregPavlik/snowflake_bulk_loader/blob/master/02.%20Bulk%20Load%20-%20Runtime.sql
第一个挑战是文件的 LIST 命令返回的日期格式与标准时间戳文字格式不同。有一个 UDF 可以从上次修改时间转换为雪花时间戳。第二个挑战是 LIST 命令返回路径中的阶段名称,但方式略有不同,具体取决于它是内部还是外部以及外部取决于云主机。还有另一个 UDF 从路径中去除舞台名称。
SP 从那里列出文件,获取最新的文件,然后发出复制命令。您可以按照它在做什么,并将适当的点更改为您的阶段名称、表名称和复制命令。
这假设您使用的是命名阶段(外部阶段被命名为使用 @ 引用的内部阶段)。如果没有,您可以调整以@为前缀的阶段名称的代码。
create or replace function LAST_MODIFIED_TO_TIMESTAMP(LAST_MODIFIED string)
returns timestamp_tz
as
$$
to_timestamp_tz(left(LAST_MODIFIED, len(LAST_MODIFIED) - 4) || ' ' || '00:00', 'DY, DD MON YYYY HH:MI:SS TZH:TZM')
$$;
create or replace function STAGE_PATH_SHORTEN(FILE_PATH string)
returns string
language javascript
as
$$
/*
Removes the cloud provider prefix and stage name from the file path
*/
var s3 = FILE_PATH.search(/s3:\/\//i);
if ( s3 != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", s3 + 5) + 1);
}
var azure = FILE_PATH.search(/azure:\/\//i);
if ( azure != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", azure + 8) + 1);
}
var newStyleInternal = FILE_PATH.search(/stages\//i);
if (newStyleInternal != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal + 7) + 1);
}
var newStyleInternal = FILE_PATH.search(/stages[a-zA-Z0-9]{4,10}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}\//i);
if (newStyleInternal != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal) + 1);
}
var stageRegExp = "/";
var re = new RegExp(stageRegExp, "i");
var stageInStr = FILE_PATH.search(re);
if (stageInStr != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", stageInStr) + 1);
}
throw "Unknown file path type."
$$;
create or replace procedure INGEST_MOST_RECENT_FILE(STAGE_NAME string, TARGET_TABLE string)
returns string
language javascript
execute as caller
as
$$
try{
getResultSet(`list @${STAGE_NAME}`);
var fileName = executeSingleValueQuery('FILE_NAME',
`select stage_path_shorten("name") as FILE_NAME, LAST_MODIFIED_TO_TIMESTAMP("last_modified") as LAST_MODIFIED_TS
from table(result_scan(last_query_id())) order by LAST_MODIFIED_TS desc limit 1;`);
// Modify with your COPY INTO statement here, but leave the last part -- files=('${fileName}') -- unmodified.
var copyStatus = executeSingleValueQuery("status",
`
copy into ${TARGET_TABLE} from @${STAGE_NAME} file_format=(type=CSV) files=('${fileName}') ;
`
);
return `Copy status: ${copyStatus}.`;
} catch (err) {
return `Error: ${err.message}.`;
}
function getResultSet(sql){
cmd = {sqlText: sql};
stmt = snowflake.createStatement(cmd);
var rs;
rs = stmt.execute();
return rs;
}
function executeSingleValueQuery(columnName, queryString) {
var out;
cmd1 = {sqlText: queryString};
stmt = snowflake.createStatement(cmd1);
var rs;
try{
rs = stmt.execute();
rs.next();
return rs.getColumnValue(columnName);
}
catch(err) {
if (err.message.substring(0, 18) == "ResultSet is empty"){
throw "ERROR: No rows returned in query.";
} else {
throw "ERROR: " + err.message.replace(/\n/g, " ");
}
}
return out;
}
$$;
call ingest_most_recent_file('TEST_STAGE', 'TARGET_TABLE');