【问题标题】:Assign/Substitute a list of values in a SQL statement for PostgreSQL using Python使用 Python 在 PostgreSQL 的 SQL 语句中分配/替换值列表
【发布时间】:2020-10-13 21:21:37
【问题描述】:

我有一个巨大的表,下面有很多分区表,但我只想查询/更新少数 (18) 个分区。所以我必须准备并运行 18 个 python 脚本(因为我必须按照我们的 DBA 指令在插入每个分区表后创建和关闭连接,因为每个 period 包含数百万条记录)才能运行我的插入语句。 所以我尝试了两种方法来解决这个问题: 一种是运行一个 for 循环来替换我的 SQL 语句中的句点列表,这涉及编写两个 python 脚本:

import os
import logging
#import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

list_of_periods = [
    {'period': '1940'},
    {'period': '1941_1945'},
    {'period': '1946_1950'},
    {'period': '1951_1955'},
    {'period': '1956_1960'},
    {'period': '1961_1965'},
    {'period': '1966_1970'},
    {'period': '1971_1975'},
    {'period': '1976_1980'},
    {'period': '1981_1985'},
    {'period': '1986_1990'},
    {'period': '1991_1995'},
    {'period': '1996_2000'},
    {'period': '2001_2005'},
    {'period': '2006_2010'},
    {'period': '2011_2015'},
    {'period': '2016_2020'},
    {'period': '2021_2025'}
]

if __name__ == "__main__":
    logging.info("Starting test process")
    logging.info("  cpc = {}".format(cpc_name) + '\n')
    for period in list_of_periods:
        os.system('python sample_segment.py')


########
sample_segment.py

import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

if __name__ == "__main__":
    logging.info("Starting test process")
    logging.info("  cpc = {}".format(cpc_name) + '\n')
    connection = psycopg2.connect(user        = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
                                  password    = os.environ.get("DATABASE_PASS", "pass"),
                                  host        = os.environ.get("DATABASE_HOST", "psql.silver.com"),
                                  port        = 5432,
                                  dbname      = os.environ.get("DATABASE_NAME", "psql_db"),
                                  options     = "-c search_path=DATAVANT_O")                    
    with connection.cursor() as cursor:
        logging.info(str(connection.get_dsn_parameters()) + '\n')
        cursor.execute("SELECT version();")
        connection.commit()
        conn = cursor.fetchone()
        logging.info("You are connected to - " + str(conn))
        tempb = '''
        INSERT INTO DATAVANT_STG_O.mortality_index_{period}           
        SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period}; '''
        logging.info("Performing Insert Operation")
        cursor.execute(tempb)
        connection.commit()
        count = cursor.rowcount
        logging.info(str(count) + " - count for the period:  {period}")
        logging.info(" - count for the period:  {period}")
        connection.close()
        print("PostgreSQL connection is closed")

显然它不起作用,我当然怀疑我的 Python 编程技能,或者我正在寻找类似 @​​987654322@ 的东西 但无论如何我的意图是把下面的陈述转过来

INSERT INTO DATAVANT_STG_O.mortality_index_{period}           
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period};

进入

INSERT INTO DATAVANT_STG_O.mortality_index_1940           
SELECT * FROM DATAVANT_STG_O.tmp_mortality_1940;

INSERT INTO DATAVANT_STG_O.mortality_index_1941_1945           
SELECT * FROM DATAVANT_STG_O.tmp_mortality_1941_1945;

etc...for 18 periods

我尝试的第二种方法是在我的 python 脚本中读取 YAML 文件。

import os
import logging
import psycopg2
import socket
import yaml
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

with open(r'/home/SILVER/user/test/periods.yaml') as file:
    YEARS = yaml.load(file, Loader=yaml.FullLoader)
    print("load yaml file" + '\n')
    print(YEARS)


if __name__ == "__main__":
    logging.info("Starting test process")
    logging.info("  cpc = {}".format(cpc_name) + '\n')
    connection = psycopg2.connect(user        = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
                                  password    = os.environ.get("DATABASE_PASS", "pass"),
                                  host        = os.environ.get("DATABASE_HOST", "psql.host.com"),
                                  port        = 5432,
                                  dbname      = os.environ.get("DATABASE_NAME", "psql_db"),
                                  options     = "-c search_path=DATAVANT_O")                    
    with connection.cursor() as cursor:
        logging.info(str(connection.get_dsn_parameters()) + '\n')
        cursor.execute("SELECT version();")
        connection.commit()
        conn = cursor.fetchone()
        logging.info("You are connected to - " + str(conn))
        tempb = '''
        INSERT INTO DATAVANT_STG_O.mortality_index_YEARS[periods]
        SELECT * FROM DATAVANT_STG_O.tmp_mortality_YEARS[periods]; '''
        cursor.execute(tempb)
        logging.info("Performing Insert Operation")
        connection.commit()
        count = cursor.rowcount
        logging.info(str(count) + " - count for the period:  YEARS[periods]")
        logging.info(" - count for the period:  YEARS[periods]")
        connection.close()
        print("PostgreSQL connection is closed")

##YAML 文件

---
periods:

  - 1940
  - 1941_1945
  - 1946_1950
  - 1951_1955
  - 1956_1960
  - 1961_1965
  - 1966_1970
  - 1971_1975
  - 1976_1980
  - 1981_1985
  - 1986_1990
  - 1991_1995
  - 1996_2000
  - 2001_2005
  - 2006_2010
  - 2011_2015
  - 2016_2020
  - 2021_2025

这是我了解到无法将 YAML 文件中的值分配给 python 文件中的 SQL 语句的部分。

我对这个大问题表示歉意,但我只想说清楚并提供尽可能多的信息。

如果有任何建议可以帮助我使用上述任何一种方法解决此问题,我将不胜感激,或者随时向我推荐一个新方法,谢谢!

##Below 是输出,当我尝试迈克的方法时:

import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2026, 5)])

for period in periods:
    # Do your psycopg2 connection here and get your cursor
    connection = psycopg2.connect(user        = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
                                  password    = os.environ.get("DATABASE_PASS", "pass"),
                                  host        = os.environ.get("DATABASE_HOST", "psql.silver.com"),
                                  port        = 5432,
                                  dbname      = os.environ.get("DATABASE_NAME", "psql_db"),
                                  options     = "-c search_path=DATAVANT_O")                    
    with connection.cursor() as cursor:
        logging.info(str(connection.get_dsn_parameters()) + '\n')
        cursor.execute("SELECT version();")
        connection.commit()
        conn = cursor.fetchone()
        logging.info("You are connected to - " + str(conn))
        cursor.execute("""
        SELECT COUNT(*) FROM datavant_stg_o.mortality_index_{};""". format(period, period)
        )
        # Commit and close your connection here
        connection.commit()
        count = cursor.rowcount
        logging.info("Count for the period {} is: " . format(period, period) + str(count) +  '\n')
        connection.close()
        print("PostgreSQL connection is closed" + "\n")

(目前我只是尝试执行 count(*)s 来检查功能) 所以我期望分区的计数为:

SELECT COUNT(*) FROM datavant_stg_o.mortality_index_1940 - 1001066
SELECT COUNT(*) FROM datavant_stg_o.mortality_index_1941_1945 - 1713850
etc which are the original counts when i query from pgAdmin

但我得到的输出为

Count for the period 1940 is: 1, 
Count for the period 1941_1945 is: 1 etc,

跟引号有关系吗?

【问题讨论】:

    标签: python sql postgresql yaml pyyaml


    【解决方案1】:

    如果您使用的是 python 3.6 或更高版本:

    periods = ['1940']
    periods.extend([f'{i}_{i + 4}' for i in range(1941, 2026, 5)])
    
    for period in periods:
        # Do your psycopg2 connection here and get your cursor
        cursor.execute(f"""
          INSERT INTO DATAVANT_STG_O.mortality_index_{period}
          SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period};""")
        # Commit and close your connection here
    

    对于较早的python:

    periods = ['1940']
    periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2026, 5)])
    
    for period in periods:
        # Do your psycopg2 connection here and get your cursor
        cursor.execute("""
          INSERT INTO DATAVANT_STG_O.mortality_index_{}
          SELECT * FROM DATAVANT_STG_O.tmp_mortality_{};""". format(period, period)
        )
        # Commit and close your connection here
    

    【讨论】:

    • 迈克,感谢您的回答,我正在使用 python 3,但是在尝试执行它时出现以下错误 period.extend([f'{i}_{i + 4}' for i in range(1941, 2026, 5)]) ^ SyntaxError: invalid syntax and the caret(error) is under the closing quote.
    • @Mando 格式字符串必须在 3.6 或更高版本才能工作。请尝试另一个明确使用format() 的示例。在periodsexecute() 的设置中。
    • 另一个示例将句点列表替换为 sql 语句,但 SQL 语句本身不起作用,请在我原来的问题末尾找到上面的解释
    • @Mando 当你调用format() 时,你传递的参数应该对应于字符串中{} 占位符的数量,所以不要传递period 两次。当你执行select count(*)... 时,结果只有一行。请改用count = cursor.fetchone()[0]
    • 啊,我错过了,感谢您指出这一点。真的很感激!!
    猜你喜欢
    • 2020-05-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-04
    相关资源
    最近更新 更多