【问题标题】:Bulk updating existing rows in Redshift批量更新 Redshift 中的现有行
【发布时间】:2014-04-27 21:29:42
【问题描述】:

这看起来应该很容易,但实际上并非如此。我正在将查询从 MySQL 迁移到表单的 Redshift:

INSERT INTO table
(...)
VALUES
(...)
ON DUPLICATE KEY UPDATE
  value = MIN(value, VALUES(value))

对于我们要插入的尚未在表中的主键,它们只是被插入。对于表中已经存在的主键,我们会根据取决于行中现有值和新值的条件更新行的值。

http://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html 不起作用,因为在我的情况下 filter_expression 取决于表中的当前条目。我目前正在创建一个临时表,使用COPY 语句插入其中,并试图找出合并临时表和真实表的最佳方法。

【问题讨论】:

标签: sql postgresql amazon-redshift


【解决方案1】:

我现在必须为一个项目做这件事。我使用的方法包括 3 个步骤:

1.

运行更新以处理更改的字段(我正在更新字段是否已更改,但您当然可以限定):

update table1 set col1=s.col1, col2=s.col2,...
from table1 t
 join stagetable s on s.primkey=t.primkey;

2.

运行处理新记录的插入:

insert into table1
select s.* 
from stagetable s 
 left outer join table1 t on s.primkey=t.primkey
where t.primkey is null;

3.

将源中不再存在的行标记为非活动(我们的报告工具使用过滤非活动记录的视图):

update table1 
set is_active_flag='N', last_updated=sysdate
from table1 t
 left outer join stagetable s on s.primkey=t.primkey
where s.primkey is null;

【讨论】:

  • 由于某种原因,第三个查询返回“错误:目标表必须是等值连接谓词的一部分”。
  • 它以前工作过,但我发现现在工作了:update table1 set is_active_flag='N', last_updated=sysdate from (select t.primkey from table1 t left outer join stagetable s on s. primkey=t.primkey where s.primkey is null ) a where table1.primkey = a.primkey;
  • 更新更改然后插入新行比删除更改的行并插入所有(更改的和新的)更好??由于 redshift 中的更新操作在幕后是一个删除和插入操作..
【解决方案2】:

可以创建临时表。在 redshift 中最好删除和插入记录。 检查此文档

http://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html

【讨论】:

    【解决方案3】:

    这是 Redshift 的完整工作方法。

    假设:

    A.S3 中的数据以 gunzip 格式提供,带有 '|' 分隔列,可能有一些垃圾数据,请参阅 ma​​xerror

    B.Sales 事实带有两个维度表以保持简单(TIME 和 SKU(SKU 可能有许多组和类别))。

    C.你有这样的销售表。

    CREATE TABLE sales (
     sku_id int encode zstd,
     date_id int encode zstd,
    quantity numeric(10,2) encode delta32k,
    );
    

    1) 创建 Staging 表,该表应类似于应用程序使用的 Online Table。

    CREATE TABLE stg_sales_onetime (
     sku_number varchar(255) encode zstd,
     time varchar(255) encode zstd,
     qty_str varchar(20) encode zstd,
     quantity numeric(10,2) encode delta32k,
     sku_id int encode zstd,
     date_id int encode zstd
    );
    

    2)从 S3 复制数据(这可以使用 SSH 完成)。

    copy stg_sales_onetime (sku_number,time,qty_str) from 
      's3://<buecket_name>/<full_file_path>' CREDENTIALS 'aws_access_key_id=<your_key>;aws_secret_access_key=<your_secret>' delimiter '|' ignoreheader 1 maxerror as 1000 gzip;
    

    3)此步骤是可选的,如果您没有良好的格式化数据,如果需要,这是您的转换步骤(如将 String(12.555654) 数量转换为 Number(12.56))

    update  stg_sales_onetime set quantity=convert(decimal(10,2),qty_str);
    

    4)从维度表中填充正确的 ID。

    update  stg_sales_onetime set sku_id=<your_sku_demesion_table>.sku_id  from <your_sku_demesion_table> where stg_sales_onetime.sku_number=<your_sku_demesion_table>.sku_number;
    update  stg_sales_onetime set time_id=<your_time_demesion_table>.time_id  from <your_time_demesion_table> where stg_sales_onetime.time=<your_time_demesion_table>.time;
    

    5)您终于有了从 Staging 到 Online Sales 表的数据。

    insert into sales(sku_id,time_id,quantity)  select sku_id,time_id,quantity from stg_sales_onetime;
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多