我能够通过 Python、Boto3 和 Psycopg2 API 连接到 AWS Redshift,获取临时用户凭证并在 Redshift 中创建表。我还将数据从 S3 存储桶复制到该 Redshift 表。为了更好的可读性,我将代码分开。
我要感谢 Stack Overflow 上的 @demicioglu 和达拉斯/沃思堡 Postgres 聚会小组的 Jessica Sharp,他们在以下位置提供了示例:
https://github.com/sharpDBA/redshift_demo
db.Conn # connection information
host = '#####-redshift.#####.us-east-1.redshift.amazonaws.com'
port = 5439
database = '#####'
db.Cred # Credential information
import boto3
import psycopg2
import sys
DB_NAME = '######'
CLUSTER_IDENTIFIER = '######-redshift'
DB_USER = '#####'
try:
client = boto3.client('redshift', region_name='us-east-1')
#get cluster credentials and temp username and password
cluster_creds = client.get_cluster_credentials(
DbUser=DB_USER, DbName=DB_NAME, ClusterIdentifier=CLUSTER_IDENTIFIER, AutoCreate=False)
temp_user = cluster_creds['DbUser']
temp_pswd = cluster_creds['DbPassword']
#report any errors
except Exception as ex:
print("Exception name : " + ex.__class__.__name__)
print(str(ex))
print("Failed to open connection to Redshift database")
sys.exit(1)
db.CreateTbl # Create table in Redshift
import psycopg2
import dbCred
import dbConn
import sys
# Set Redshift cluster connection details
def CreateTable(schema, tblName):
schema=schema
tblName=tblName
try:
# Get AWS Redshift connection attributes
dbname = dbConn.database
host = dbConn.host
port = dbConn.port
# Get temporary database credentials
user = dbCred.temp_user
password = dbCred.temp_pswd
# Connect to AWS Redshift database
connect = psycopg2.connect(database=dbname,
host=host,
port=port,
user=user,
password=password)
cur = connect.cursor()
# SQL Query
cur.execute("CREATE TABLE " + schema + "." + tblName + " "
"(vendorid varchar(4), pickup_datetime TIMESTAMP, "
"dropoff_datetime TIMESTAMP, store_and_fwd_flag varchar(1), "
"ratecode int, pickup_longitude float(4), pickup_latitude float(4),"
"dropoff_logitude float(4), dropoff_latitude float(4), "
"passenger_count int, trip_distance float(40), fare_amount float(4), "
"extra float(4), mta_tax float(4), tip_amount float(4), "
"tolls_amount float(4), ehail_fee float(4), improvement_surcharge float(4), "
"total_amount float(4), payment_type varchar(4), trip_type varchar(4)) "
"DISTSTYLE EVEN SORTKEY (passenger_count, pickup_datetime);")
connect.commit()
#report any errors
except Exception as ex:
print("Exception name : " + ex.__class__.__name__)
print(str(ex))
print("Failed to open connection to Redshift database")
sys.exit(1)
#close all connections
finally:
cur.close()
connect.close()
if __name__ == "__main__":
if (len(sys.argv) != 3):
print("Usage: " + sys.argv[0] + "<SchemaName>" + "<TableName>")
sys.exit(1)
else:
schema = sys.argv[1]
tblName = sys.argv[2]
CreateTable(schema, tblName)