【问题标题】:Port data from MySQL database to PostgreSQL将数据从 MySQL 数据库移植到 PostgreSQL
【发布时间】:2019-05-09 11:13:08
【问题描述】:

我需要在我们在 PostgreSQL 上运行的数据仓库中创建一个临时表,并从我们使用 MySQL 数据库的 Magento 网站导入数据,我正在尝试使用 Python。

我为导入目的创建了以下查询,

1.请您检查并确认这是否正常? 或者有其他方法可以做到吗?

2. 另外我想知道我们如何在移植时管理 DATATYPE 不匹配问题?

3. 我们可以用 Multicorn 等外国数据说唱歌手 (FDW) 做些什么吗?我们该怎么做?我只需要来源中的几列(来源有 50 多列和我只需要 15 列)被转移到目的地,所以 FDW 可以工作吗?

如果有人可以发布示例或编辑以下代码,那将有很大帮助。

import psycopg2
import os
import time
import MySQLdb
import sys
import mysql.connector
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
#from utils.utils import get_global_config


def psql_command(msql, psql, msql_command, psql_command):

    msql.execute(msql_command)

    for row in cur_msql:
        try:
            psql.execute(command, row)
        except psycopg2.Error as e:
            print "Cannot execute the query!!", e.pgerror
            sys.exit("Some problem occured with the query!!!")


def dB_Fetch():

try:
  cnx_msql = mysql.connector.connect( host=host_mysql, 
  user=user_mysql,passwd=pswd_mysql, db=dbna_mysql )

except mysql.connector.Error as e:
  print "MYSQL: Unable to connect!", e.msg
  sys.exit(1)

# Postgresql connection
try:
  cnx_psql = psycopg2.connect(conn_string_psql)
except psycopg2.Error as e:
  print('PSQL: Unable to connect!\n{0}').format(e)
  sys.exit(1)

# Cursors initializations
cur_msql = cnx_msql.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()

try:
  SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging 
  AUTHORIZATION postgres;"""

  SQL_create_sales_flat_quote="""CREATE TABLE IF NOT EXISTS 
  staging.sales_flat_quote
        (
              entity_id           BIGINT
    , store_id        BIGINT
    , customer_email      TEXT
    , customer_firstname  TEXT
    , customer_middlename TEXT
    , customer_lastname   TEXT
    , customer_is_guest   BIGINT
    , customer_group_id   BIGINT
    , created_at          TIMESTAMP WITHOUT TIME ZONE
    , updated_at          TIMESTAMP WITHOUT TIME ZONE
    , is_active             BIGINT
    , items_count           BIGINT
    , items_qty             BIGINT
    , base_currency_code            TEXT
    , grand_total           NUMERIC(12,4)
    , base_to_global_rate           NUMERIC(12,4)
    , base_subtotal         NUMERIC(12,4)
    , base_subtotal_with_discount   NUMERIC(12,4)
    );"""

SQL_create_sales_flat_quote_item="""CREATE TABLE IF NOT EXISTS 
staging.sales_flat_quote_item
        (     store_id        INTEGER
    , row_total       NUMERIC
    , updated_at      TIMESTAMP WITHOUT TIME ZONE
    , qty             NUMERIC
    , sku             CHARACTER VARYING
    , free_shipping   INTEGER
    , quote_id        INTEGER
    , price       NUMERIC
    , no_discount     INTEGER
    , item_id     INTEGER
    , product_type    CHARACTER VARYING
    , base_tax_amount NUMERIC
    , product_id      INTEGER
    , name        CHARACTER VARYING
    , created_at      TIMESTAMP WITHOUT TIME ZONE
    );"""

print("Creating  Schema")   
cur_psql.execute(SQL_create_Staging_schema)
print("schema  succesfully created")

print("Creating staging.sales_flat_quote table")
cur_psql.execute(SQL_create_sales_flat_quote)
print("staging.sales_flat_quote table  succesfully created")

print("Creating staging.sales_flat_quote_item table")
cur_psql.execute(SQL_create_sales_flat_quote_item)
print("staging.sales_flat_quote_item table  succesfully created")

cur_psql.commit();
print("Fetching data from source server")




    commands = [
(
"SELECT customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate
,is_active from sales_flat_quote 
where is_active=1;",

 "INSERT INTO staging.sales_flat_quote 
 (customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate,is_active) \
 VALUES (%(customer_id)s, %(entity_id)s
, %(store_id)s, %(created_at)s, %(updated_at)s
, %(items_count)s, %(base_row_total)s
, %(row_total)s, %(base_discount_amount)s
, %(base_subtotal_with_discount)s
, %(base_to_global_rate)s, %(is_active)s)"

),

(
 "SELECT store_id,row_total,updated_at,qty,sku
,free_shipping,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at from 
 sales_flat_quote_item",

 "INSERT INTO staging.sales_flat_quote_item 
 (store_id,row_total,updated_at,qty,sku,free_shipping
 ,quote_id,price,no_discount,item_id,product_type
 ,base_tax_amount,product_id,name,created_at) 
VALUES (%(store_id)s, %(row_total)s, %(updated_at)s
, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s
, %(price)s, %(no_discount)s, %(item_id)s
, %(product_type)s, %(base_tax_amount)s, %(product_id)s
, %(name)s, % . (created_at)s)"
)

                ]

for msql_command, psql_command in commands:
    psql_command(cur_msql, cur_psql, msql_command, psql_command)

 except (Exception, psycopg2.Error) as error:
     print ("Error while fetching data from PostgreSQL", error)
 finally:
     ## Closing cursors
     cur_msql.close()
     cur_psql.close()
     ## Committing
     cnx_psql.commit()
     ## Closing database connections
     cnx_msql.close()
     cnx_psql.close()


if __name__ == '__main__':
   dB_Fetch()

【问题讨论】:

  • 你应该重新格式化和缩进你的代码以便我们理解它
  • 我认为最简单的方法确实是使用 FDW 来执行此特定任务。您可以在 github (github.com/EnterpriseDB/mysql_fdw) 上找到 mysql_fdw。你具体需要什么,你在这件事上有什么困难?

标签: python mysql python-3.x postgresql magento


【解决方案1】:

完成了来自 jaisus 的评论。

感谢您的评论我认为最简单的确实是使用 FDW 来完成这个特定的任务。你可以在 github 上找到 mysql_fdw (github.com/EnterpriseDB/mysql_fdw)。您具体需要什么和 你在这件事上有什么困难? – Jaisus 5 月 9 日 12:26

【讨论】:

    猜你喜欢
    • 2011-07-16
    • 2010-11-15
    • 2011-01-13
    • 2014-07-05
    • 2016-06-22
    • 2011-08-08
    • 1970-01-01
    • 2020-05-09
    • 2012-02-20
    相关资源
    最近更新 更多