【问题标题】:Issues with concurrent inserts on Redshift tableRedshift 表上的并发插入问题
【发布时间】:2019-11-27 14:03:22
【问题描述】:

我正在尝试使用 AWS 胶水上的 python 脚本同时处理插入/更新到 redshift 数据库。我正在使用 pg8000 库来执行我所有的数据库操作。并发插入/更新失败并出现错误Error Name:1023 ,Error State:XX000)。在研究错误时,我发现该错误与Serializable Isolation 有关。

任何人都可以查看代码并确保在插入/更新发生时不会发生冲突吗?

我尝试在调用类中使用随机睡眠时间。它适用于几个案例,但随着工人数量的增加。插入/更新案例失败。

    import sys
    import time
    import concurrent.futures
    import pg8000
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job

    args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME','REDSHIFT_HOST','REDSHIFT_PORT','REDSHIFT_DB','REDSHIFT_USER_NAME','REDSHIFT_USER_PASSWORD'])

    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    job_run_id = args['JOB_RUN_ID']
    maximum_workers = 5

    def executeSql(sqlStmt):
      conn = pg8000.connect(database=args['REDSHIFT_DB'],user=args['REDSHIFT_USER_NAME'],password=args['REDSHIFT_USER_PASSWORD'],host=args['REDSHIFT_HOST'],port=int(args['REDSHIFT_PORT']))
      conn.autocommit = True
      cur = conn.cursor()
      cur.execute(sqlStmt)
      cur.close()
      conn.close()


    def executeSqlProcedure(procedureName, procedureArgs = ""):
        try:
            logProcStrFormat  = "CALL table_insert_proc('{}','{}','{}','{}',{},{})"
            #Insert into the log table - create the record
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'pending','','getdate()','null')) #Code fails here
            #Executing the procedure
            procStrFormat = "CALL {}({})"
            executeSql(procStrFormat.format(procedureName,procedureArgs))
            print("Printing from {} process at ".format(procedureName),time.ctime())
            #Update the record in log table to complete
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'complete','','null','getdate()')) #Code fails here
        except Exception as e:
            errorMsg = str(e.message["M"])
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'failure',errorMsg,'null','getdate()'))
            raise 
            sys.exit(1)


    def runDims():
      dimProcedures = ["test_proc1","test_proc2","test_proc3","test_proc4","test_proc5"]

      with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
        result = list(executor.map(executeSqlProcedure, dimProcedures))


    def runFacts():
      factProcedures = ["test_proc6","test_proc7","test_proc8","test_proc9"]

      with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
        result = list(executor.map(executeSqlProcedure, factProcedures))    


    runDims()
    runFacts()

我希望插入/更新发生在日志表中而不会锁定/出错

【问题讨论】:

  • 我建议存储在中间 s3 中,然后使用复制功能。从 s3 到 Redshift 的复制可以同时进行,而且速度要快得多

标签: python amazon-web-services amazon-redshift aws-glue pg8000


【解决方案1】:

Amazon Redshift 不适用于大量小的 INSERT 语句。

来自Use a Multi-Row Insert - Amazon Redshift

如果 COPY 命令不是一个选项并且您需要 SQL 插入,请尽可能使用多行插入。当您一次只添加一行或几行数据时,数据压缩效率很低。

多行插入通过批处理一系列插入来提高性能。下面的示例使用单个 INSERT 语句将三行插入到一个四列表中。这仍然是一个小插入,只是为了说明多行插入的语法。

insert into category_stage values
(default, default, default, default),
(20, default, 'Country', default),
(21, 'Concerts', 'Rock', default);

或者,将数据输出到 Amazon S3,然后使用 COPY 命令执行批量加载。这将更加高效,因为它可以跨所有节点并行执行负载。

【讨论】:

    猜你喜欢
    • 2022-01-17
    • 1970-01-01
    • 1970-01-01
    • 2012-08-08
    • 1970-01-01
    • 1970-01-01
    • 2011-03-07
    • 2016-04-27
    • 2017-03-18
    相关资源
    最近更新 更多