【发布时间】:2019-05-09 11:13:08
【问题描述】:
我需要在我们在 PostgreSQL 上运行的数据仓库中创建一个临时表,并从我们使用 MySQL 数据库的 Magento 网站导入数据,我正在尝试使用 Python。
我为导入目的创建了以下查询,
1.请您检查并确认这是否正常? 或者有其他方法可以做到吗?
2. 另外我想知道我们如何在移植时管理 DATATYPE 不匹配问题?
3. 我们可以用 Multicorn 等外国数据说唱歌手 (FDW) 做些什么吗?我们该怎么做?我只需要来源中的几列(来源有 50 多列和我只需要 15 列)被转移到目的地,所以 FDW 可以工作吗?
如果有人可以发布示例或编辑以下代码,那将有很大帮助。
import psycopg2
import os
import time
import MySQLdb
import sys
import mysql.connector
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
#from utils.utils import get_global_config
def psql_command(msql, psql, msql_command, psql_command):
msql.execute(msql_command)
for row in cur_msql:
try:
psql.execute(command, row)
except psycopg2.Error as e:
print "Cannot execute the query!!", e.pgerror
sys.exit("Some problem occured with the query!!!")
def dB_Fetch():
try:
cnx_msql = mysql.connector.connect( host=host_mysql,
user=user_mysql,passwd=pswd_mysql, db=dbna_mysql )
except mysql.connector.Error as e:
print "MYSQL: Unable to connect!", e.msg
sys.exit(1)
# Postgresql connection
try:
cnx_psql = psycopg2.connect(conn_string_psql)
except psycopg2.Error as e:
print('PSQL: Unable to connect!\n{0}').format(e)
sys.exit(1)
# Cursors initializations
cur_msql = cnx_msql.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()
try:
SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging
AUTHORIZATION postgres;"""
SQL_create_sales_flat_quote="""CREATE TABLE IF NOT EXISTS
staging.sales_flat_quote
(
entity_id BIGINT
, store_id BIGINT
, customer_email TEXT
, customer_firstname TEXT
, customer_middlename TEXT
, customer_lastname TEXT
, customer_is_guest BIGINT
, customer_group_id BIGINT
, created_at TIMESTAMP WITHOUT TIME ZONE
, updated_at TIMESTAMP WITHOUT TIME ZONE
, is_active BIGINT
, items_count BIGINT
, items_qty BIGINT
, base_currency_code TEXT
, grand_total NUMERIC(12,4)
, base_to_global_rate NUMERIC(12,4)
, base_subtotal NUMERIC(12,4)
, base_subtotal_with_discount NUMERIC(12,4)
);"""
SQL_create_sales_flat_quote_item="""CREATE TABLE IF NOT EXISTS
staging.sales_flat_quote_item
( store_id INTEGER
, row_total NUMERIC
, updated_at TIMESTAMP WITHOUT TIME ZONE
, qty NUMERIC
, sku CHARACTER VARYING
, free_shipping INTEGER
, quote_id INTEGER
, price NUMERIC
, no_discount INTEGER
, item_id INTEGER
, product_type CHARACTER VARYING
, base_tax_amount NUMERIC
, product_id INTEGER
, name CHARACTER VARYING
, created_at TIMESTAMP WITHOUT TIME ZONE
);"""
print("Creating Schema")
cur_psql.execute(SQL_create_Staging_schema)
print("schema succesfully created")
print("Creating staging.sales_flat_quote table")
cur_psql.execute(SQL_create_sales_flat_quote)
print("staging.sales_flat_quote table succesfully created")
print("Creating staging.sales_flat_quote_item table")
cur_psql.execute(SQL_create_sales_flat_quote_item)
print("staging.sales_flat_quote_item table succesfully created")
cur_psql.commit();
print("Fetching data from source server")
commands = [
(
"SELECT customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate
,is_active from sales_flat_quote
where is_active=1;",
"INSERT INTO staging.sales_flat_quote
(customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate,is_active) \
VALUES (%(customer_id)s, %(entity_id)s
, %(store_id)s, %(created_at)s, %(updated_at)s
, %(items_count)s, %(base_row_total)s
, %(row_total)s, %(base_discount_amount)s
, %(base_subtotal_with_discount)s
, %(base_to_global_rate)s, %(is_active)s)"
),
(
"SELECT store_id,row_total,updated_at,qty,sku
,free_shipping,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at from
sales_flat_quote_item",
"INSERT INTO staging.sales_flat_quote_item
(store_id,row_total,updated_at,qty,sku,free_shipping
,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at)
VALUES (%(store_id)s, %(row_total)s, %(updated_at)s
, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s
, %(price)s, %(no_discount)s, %(item_id)s
, %(product_type)s, %(base_tax_amount)s, %(product_id)s
, %(name)s, % . (created_at)s)"
)
]
for msql_command, psql_command in commands:
psql_command(cur_msql, cur_psql, msql_command, psql_command)
except (Exception, psycopg2.Error) as error:
print ("Error while fetching data from PostgreSQL", error)
finally:
## Closing cursors
cur_msql.close()
cur_psql.close()
## Committing
cnx_psql.commit()
## Closing database connections
cnx_msql.close()
cnx_psql.close()
if __name__ == '__main__':
dB_Fetch()
【问题讨论】:
-
你应该重新格式化和缩进你的代码以便我们理解它
-
我认为最简单的方法确实是使用 FDW 来执行此特定任务。您可以在 github (github.com/EnterpriseDB/mysql_fdw) 上找到 mysql_fdw。你具体需要什么,你在这件事上有什么困难?
标签: python mysql python-3.x postgresql magento