【问题标题】:Upsert from AWS Glue to Amazon Redshift从 AWS Glue 向上插入到 Amazon Redshift
【发布时间】:2018-09-19 00:03:22
【问题描述】:

我知道没有直接的 UPSERT 查询可以直接从 Glue 执行到 Redshift。是否可以在胶水脚本本身内实现临时表概念?

所以我的期望是创建临时表,将其与目标表合并,最后将其删除。可以在 Glue 脚本中实现吗?

【问题讨论】:

  • 如果你已经有一个 redshift 集群,我会考虑使用频谱,以便从 redshift 中访问胶水的外部表——我已经使用它来基本上消除我的临时“暂存”表的需要工作流...我直接在执行 upserts 的 sql 语句中查询外部表。我使用 aws lambda 或 aws batch 来执行 sql 语句,具体取决于我期望的运行时间。

标签: amazon-web-services amazon-redshift aws-glue


【解决方案1】:

通过将“postactions”选项传递给 JDBC 接收器,可以使用 Glue 中的临时表在 Redshift 中实现 upsert:

val destinationTable = "upsert_test"
val destination = s"dev_sandbox.${destinationTable}"
val staging = s"dev_sandbox.${destinationTable}_staging"

val fields = datasetDf.toDF().columns.mkString(",")

val postActions =
  s"""
     DELETE FROM $destination USING $staging AS S
        WHERE $destinationTable.id = S.id
          AND $destinationTable.date = S.date;
     INSERT INTO $destination ($fields) SELECT $fields FROM $staging;
     DROP TABLE IF EXISTS $staging
  """

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "postactions" -> postActions
  )),
  redshiftTmpDir = s"$tempDir/redshift",
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)

确保用于写入 Redshift 的用户有足够的权限在暂存模式中创建/删除表。

【讨论】:

    【解决方案2】:

    是的,这是完全可以实现的。您所需要的只是将 pg8000 模块导入到胶水作业中。 pg8000 模块是用于连接 Amazon Redshift 并通过游标执行 SQL 查询的 python 库。 Python 模块参考:https://github.com/mfenniak/pg8000 然后,通过pg8000.connect(user='user',database='dbname',host='hosturl',port=5439,password='urpasswrd') 连接到您的目标集群 并使用 Glue,s datasink 选项加载到临时表中,然后使用 pg8000 游标运行 upsert sql 查询

    >>> import pg8000
    >>> conn = pg8000.connect(user='user',database='dbname',host='hosturl',port=5439,password='urpasswrd')
    >>> cursor = conn.cursor()
    >>> cursor.execute("CREATE TEMPORARY TABLE book (id SERIAL, title TEXT)")
    >>> cursor.execute("INSERT INTO TABLE final_target"))
    >>> conn.commit()
    

    您需要压缩 pg8000 包并将其放入 s3 存储桶中,并将其引用到 Glue Job 部分的 Advanced options/Job parameters 下的 Python Libraries 路径。

    【讨论】:

      【解决方案3】:

      显然glueContext.write_dynamic_frame.from_jdbc_conf函数中的connection_options字典参数有2个有趣的参数:preactionspostactions

      target_table = "my_schema.my_table"
      stage_table = "my_schema.#my_table_stage_table"
      
      
      pre_query = """
          drop table if exists {stage_table};
          create table {stage_table} as select * from {target_table} LIMIT 0;""".format(stage_table=stage_table, target_table=target_table)
      
      post_query = """
          begin;
          delete from {target_table} using {stage_table} where {stage_table}.id = {target_table}.id ; 
          insert into {target_table} select * from {stage_table}; 
          drop table {stage_table}; 
          end;""".format(stage_table=stage_table, target_table=target_table)
          
      datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(
          frame = datasource0, catalog_connection ="test_red", redshift_tmp_dir='s3://s3path', transformation_ctx="datasink4",
          connection_options = {"preactions": pre_query, "postactions": post_query, 
                                "dbtable": stage_table, "database": "redshiftdb"})
      

      基于https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/

      【讨论】:

      • 如果你按原样使用它,它实际上不起作用。 stage_table 将在帧编写器运行预查询之前创建。所以这是首先写入 stage_table,然后删除它,然后像 target_table 一样重新创建它(如果它不存在,它会出错)
      猜你喜欢
      • 2018-11-03
      • 1970-01-01
      • 2020-02-04
      • 2023-03-13
      • 2018-01-30
      • 2019-02-23
      • 2020-12-30
      • 2021-05-01
      • 1970-01-01
      相关资源
      最近更新 更多