【问题标题】:How to manage stored procedure in AWS Redshift using an automation or command line tool?如何使用自动化或命令行工具管理 AWS Redshift 中的存储过程?
【发布时间】:2021-10-18 14:14:45
【问题描述】:

我们已经在 AWS 上的 Redshift 中构建了许多存储过程。 我们需要下载和上传这些存储过程,以便将它们保存在 GITHUB 中作为跟踪更改的一种手段。

这些程序最终需要成为 cloudformation 模板的一部分,这样基础架构也可以得到维护。

理想情况下,这可以使用 AWS CLI 完成,但似乎没有执行此操作的命令。

如何在自动化/CICD 环境中管理 AWS RedShift 存储过程?

【问题讨论】:

    标签: amazon-web-services stored-procedures continuous-integration amazon-redshift aws-cli


    【解决方案1】:

    我有一部分可行的解决方案。

    import json
    import psycopg2
    import os
    
    
    def run_sql_commands(input_command, database="mydatabasename", host_port=1234 ,host_url="datawarehouse.redshift.amazonaws.com"):
        """
        :param input_command: sql string to execute
        :param database: which database to run the query
        :return:
        """
        results = None
        # db_user and db_pass will need to be set as environment variables
        db_user = os.environ["db_user"]
        db_pass = os.environ["db_pass"]
    
        db_host = host_url
        db_port = host_port
        db_name = database
    
        try:
            conn = psycopg2.connect(
                "dbname={} port={} user={} host={} password={}".format(db_name, db_port, db_user, db_host, db_pass))
    
            cursor = conn.cursor()
            cursor.execute(input_command)
            results = cursor.fetchall()
    
        except Exception as e:
            return None
    
        return results
    
    
    def get_arg_type(oid, database):
        sql = f"SELECT typname FROM pg_catalog.pg_type WHERE oid={oid}"
        r = run_sql_commands(sql, database)
        return r[0][0]
    
    
    def download_all_procedures(database, file_location="./local-code-store"):
        get_all_stored_procedure_sql = """
        SELECT
            n.nspname,
            b.usename,
            p.proname,
            p.proargnames,
            p.proargtypes,
            p.prosrc
        FROM
            pg_catalog.pg_namespace n
        JOIN pg_catalog.pg_proc p ON
            pronamespace = n.oid
        JOIN pg_user b ON
            b.usesysid = p.proowner
        WHERE
            nspname NOT IN ('information_schema', 'pg_catalog');
        """
    
        r = run_sql_commands(get_all_stored_procedure_sql, database)
        counted_items=[]
        for item in r:
            table = item[0]
            author = item[1]
            procedure_name = item[2]
            procedure_arguments = item[3]
            procedure_argtypes = item[4]
            procedure = item[5]
            t_list = []
            for this_oid in procedure_argtypes.split():
                t = get_arg_type(this_oid, database="mydatabasename")
                t_list.append(t)
            meta_data = {'table': table,
                         'author': author,
                         'arguments': procedure_arguments,
                         'argument_types': t_list}
            filename = f'{file_location}/{database}/Schemas/{table}/{procedure_name}.sql'
            os.makedirs(os.path.dirname(filename), exist_ok=True)
            f = open(filename, 'w')
            count = f.write(procedure.replace('\r\n', '\n'))
            f.close()
            filename = f'{file_location}/{database}/Schemas/{table}/{procedure_name}.json'
            os.makedirs(os.path.dirname(filename), exist_ok=True)
            f = open(filename, 'w')
            counted_items.append(f.write(json.dumps(meta_data)))
            f.close()
    
        return counted_items
    
    
    if __name__ == "__main__":
        print("Starting...")
        c = download_all_procedures("mydatabase")
        print("... finished")
    

    【讨论】:

      猜你喜欢
      • 2010-10-14
      • 2022-01-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-08
      相关资源
      最近更新 更多