【问题标题】:Python script writes twice in to the tablesPython脚本两次写入表
【发布时间】:2018-04-23 01:31:22
【问题描述】:

我正在尝试使这种情况自动化。我有 2 个 .sql 文件(add1.sqladd2.sql),每个文件都有 1 个插入脚本。

我的目标是通过执行 add1.sql 中的行将一条记录写入 table1,并通过执行 add2.sql 中的行将一条记录写入 cm.cl,等待大约 5 分钟,这样后端服务就会运行。该服务从 DB1 写入到 DB2。然后我连接到 DB2 以查看来自 DB1 的记录是否与写入到 DB2 的记录相匹配。根据没有结果,会发送一封电子邮件。

下面是我的代码。除了两次写入DB1 之外,一切正常。所以,基本上插入了 4 条记录而不是 2 条。知道为什么它会写入 4 条记录吗?

import pypyodbc as pyodbc
import smtplib
sender = 'abc@abc.com'
receivers = ['abc@abc.com','xyz@abc.com']
import unittest
import time

class TestDB1(unittest.TestCase):


    def testing_master(self):

           Master_Conn = 'Driver=
 {SQLServer};Server=server\servername;Database=database;UID=userid;PWD=password'
           Master_db = pyodbc.connect(Master_Conn)
           Master_Cursor = Master_db.cursor()
           try:
               #Open, read and execute add_shell.sql
               file = open('C:\\aaa\\add1.sql', 'r')
               line = file.read()
               lines = line.replace('\n', ' ')
               file1 = open('C:\\aaa\\add2.sql', 'r')
               line1=file1.read()
               lines1=line1.replace('\n', ' ')
               Master_Cursor.execute(lines)
               time.sleep(1)
               Master_Cursor.execute(lines1)
               Master_db.commit()
               file.close()
               file1.close()

               #Get python object for latest record inserted in DB1
               Master_CID=Master_Cursor.execute("select col1 from tablename1 order by sequenceid desc").fetchone()
               #convert tuple to srting [0] gives first tuple element. 
               Master_CID_str=str(Master_CID[0])
               #Get GUID by stripping first 2 chars and last char.
               Master_CID_str=Master_CID_str[2:len(Master_CID_str)-1]
               Master_CLID=Master_Cursor.execute("select col2 from tablename2 order by sequenceid desc").fetchone()
               Master_CLID_str=str(Master_CLID[0])
               Master_CLID_str=Master_CLID_str[2:len(Master_CLID_str) - 1]



           # Wait for service that transfers data from one db to another DB to run
               time.sleep(310)
           finally:
                Master_Cursor.close()
                Master_db.close()

           return Master_CID,Master_CID_str,Master_CLID,Master_CLID_str


    def testing_int_instance(self):
           #unpacking return value of tuple from testing_master() function
           Master_CID,Master_CID_str,Master_CLID,Master_CLID_str=self.testing_master()
           print ("printing from testing_int_instance {0}".format(Master_CID))
           Int_Instance_Conn = 'Driver={SQL Server};Server=server2\servername2;Database=database2;UID=uid;PWD=password;'
           Int_db = pyodbc.connect(Int_Instance_Conn)
           Int_Cursor = Int_db.cursor()
           #return Int_db, Int_Cursor
           #execute select from  db where col matches that of one inserted in master db.
           Int_Instance_CID=Int_Cursor.execute("select col1 from table1 where cartridgemodelid = '%s'" %(Master_CID_str)).fetchone()
           print(Int_Instance_CID)
           smtpObj = smtplib.SMTP('22.101.1.333', 25)
           if (Master_CID==Int_Instance_CID):
               print("Matched")
               content="This email confirms successful  data transfer from Master to  Instance for col1: \n"
               message = "\r\n".join(["From:" + sender,"To:" + str(receivers[:]),"Subject: Test Result","",content +Master_CID_str])
               #smtpObj = smtplib.SMTP('22.101.2.222', 25)
               smtpObj.sendmail(sender, receivers, message)
           elif (Master_CID!=Int_Instance_CID):
               print("no match")
               content = "This email confirms failure of  data transfer from DB1 to DB2 for COL1: \n"
               message = "\r\n".join(["From:" + sender, "To:" + str(receivers[:]), "Subject: Test Result", "",content +Master_CID_str])
               smtpObj.sendmail(sender, receivers, message)
           Int_Instance_CLID=Int_Cursor.execute("select COL2 from table2 where col= '%s'" %(Master_CLID_str)).fetchone()
           print (Int_Instance_CLID)
           if (Master_CLID == Int_Instance_CLID):
               print ("Printing int_instance CLID {0}".format(Int_Instance_CLID))
               content = "This email confirms successful data transfer from DB1 to DB2 for COL: \n"
               message = "\r\n".join(
                  ["From:" + sender, "To:" + str(receivers[:]), "Subject: Test Result", "", content + Master_CLID_str])
               #smtpObj = smtplib.SMTP('22.101.2.222', 25)
               smtpObj.sendmail(sender, receivers, message)
               print ("Ids Matched")
           elif (Master_CLID != Int_Instance_CLID):
DB1 to DB2 for COL: \n"
               message = "\r\n".join(
                  ["From:" + sender, "To:" + str(receivers[:]), "Subject: Test Result", "", content + Master_CLID_str])
               #smtpObj = smtplib.SMTP('22.101.2.222', 25)
               smtpObj.sendmail(sender, receivers, message)
           smtpObj.quit()


           Int_db.close()

如果 name == 'ma​​in': unittest.main()

add1.sql 是:

DECLARE @Name VARCHAR(2000)
DECLARE @PartNumber VARCHAR(2000)
SELECT @Name='test'+convert(varchar,getdate(),108)
SELECT @PartNumber='17_00001_'+convert(varchar,getdate(),108)
DECLARE @XML XML
DECLARE @FileName VARCHAR(1000)
DECLARE @Id UNIQUEIDENTIFIER
SELECT @Id = NEWID()
SELECT @FileName = 'test.xml'
SELECT @XML='<model>
   <xml tags go here>
BEGIN
  INSERT INTO table1
         (ID,Name,Type,Desc,Number,Revision,Model,status,Modifiedby,Modifiedon)
   VALUES(@Id,@Name,'xyz','',@partnumber,'01',@XML,'A','453454-4545-4545-4543-345342343',GETUTCDATE())

add2.sql 是:

DECLARE @XML XML
DECLARE @CM_Name VARCHAR(2000)
DECLARE @FileName VARCHAR(1000)
DECLARE @PartNumber VARCHAR(2000)
DECLARE @Id UNIQUEIDENTIFIER
SELECT @Id=NEWID()
DECLARE @Name VARCHAR(2000)
DECLARE @CMId VARCHAR(2000)
DECLARE @CM_PartName VARCHAR(2000)
DECLARE @CM_Partnumber VARCHAR(2000)
SELECT @Name='test'+convert(varchar,getdate(),108)
SELECT @PartNumber='test'+convert(varchar,getdate(),108)
DECLARE @RowCount INT
DECLARE @Message VARCHAR(100);

SELECT @FileName = 'test.xml' 
SELECT @CMId = CM.CMID,
    @CM_Name = CM.CMName,
    @CM_PN  = CM.PN
    FROM cm.Model CM
    WHERE CM.MName LIKE 'test%'
    ORDER BY CM.ModifiedBy DESC 
SELECT @XML='<Layout>
   other xml tags...

BEGIN
  INSERT INTO cm.CL(ID, ModelID, Layout, Description, PN, Revision, CLayout, Status, ModifiedBy, ModifiedOn)
    SELECT TOP 1 @Id, @CMId, @Name, '', @PartNumber, '01', @XML, 'A', '453454-345-4534-4534-4534543545', GETUTCDATE()
    FROM cm.table1 CM
    WHERE CM.Name=@CM_Name
    AND CM.Partnumber=@CM_Partnumber

【问题讨论】:

  • 首先,最重要的一项:你在哪里调用这些函数?您只在此处定义它们。请修复第二种方法底部的代码。其次,除了表之外,没有人写入数据库。哪些表重复了两个脚本的数据?最后,其中一个脚本使用[TABLENAME]。那是什么?
  • 3) 两个表(来自两个插入脚本)重复数据。注意:如果我在 testing_master(self) 函数中注释掉 time.sleep(310),它只会将每个记录(来自每个插入脚本)写入每个表我不确定为什么它会写入两次(4 个记录写入 2 个表)与 time.sleep(310)。我需要它在那里,因为我必须等待 Windows 服务运行我什至尝试将 time.sleep(310) 排除在第一个函数之外,但这也不起作用。我不能使用拆卸方法来放置 time.sleep(310),因为我不希望它在每个函数之后运行。 4)实际表名进入[TABLENAME]。
  • 我忘记粘贴调用函数了。下面是第二个函数底部的内容。 if name__=='__main': unittest.main() 2) 我的意思是写入'table'而不是'database'
  • 我的原帖已经更新了。
  • 由于它确实写入表格并发送电子邮件,我怀疑问题是否与函数调用不正确有关。在我看来, time.sleep(310) 导致它写了两次。如果我评论该行,我的代码就可以正常工作,但就像我说的那样,我需要在那里等待 Windows 服务运行。如果您有兴趣,下面的链接提供了有关 unittest.main() 的详细信息。 elbenshira.com/blog/behind-pythons-unittest-main

标签: python sql sql-server xml


【解决方案1】:

目前,您正在呼叫test_master() 两次!首先作为您的命名方法,然后在您解压缩返回值时使用第二种方法。下面是在 Class 对象之外定义的方法的演示。如果按原样调用,testing_master 将运行两次。

考虑使用上下文管理器来读取使用with() 的 .sql 脚本,该脚本处理如下所示的打开和关闭 i/o 操作:

# FIRST CALL
def testing_master():
    #...SAME CODE...
    try:
        with open('C:\\aaa\\add1.sql', 'r') as file:
            lines = file.read().replace('\n', ' ')    
        Master_Cursor.execute(lines)
        Master_db.commit()

        time.sleep(1)

        with open('C:\\aaa\\add2.sql', 'r') as file1:
            lines1 = file1.read().replace('\n', ' ')    
        Master_Cursor.execute(lines1)
        Master_db.commit()

    #...SAME CODE...  
    return Master_CID, Master_CID_str, Master_CLID, Master_CLID_str


def testing_int_instance():
    # SECOND CALL
    Master_CID, Master_CID_str, Master_CLID, Master_CLID_str = testing_master()    
    #...SAME CODE...

if __name__ == "__main__": 
    testing_master()
    testing_int_instance()

注释掉 time(310) 似乎可行,但正如您提到的后台 Windows 服务无法有效运行,因此会中断数据库传输。

要解决此问题,请考虑在第一个方法的末尾调用第二个方法,将值作为参数传递,不带任何return 并删除解包行。然后,在主全局环境中,只运行testing_master()。当然,在 Class 定义中使用 self 进行限定。

def testing_master():
    #...SAME CODE...
    testing_int_instance(Master_CID, Master_CID_str, Master_CLID, Master_CLID_str)

def testing_int_instance(Master_CID, Master_CID_str, Master_CLID, Master_CLID_str):
    #...SKIP UNPACK LINE 
    #...CONTINUE WITH SAME CODE...

if __name__ == "__main__": 
    testing_master()

由于您的unittest,请考虑对原始设置稍作调整,您可以使用self 限定每个变量:

def testing_master():
    ...
    self.Master_CID=Master_Cursor.execute("select col1 from tablename1 order by sequenceid desc").fetchone()
    self.Master_CID_str=str(Master_CID[0])
    self.Master_CID_str=Master_CID_str[2:len(Master_CID_str)-1]
    self.Master_CLID=Master_Cursor.execute("select col2 from tablename2 order by sequenceid desc").fetchone()
    self.Master_CLID_str=str(Master_CLID[0])
    self.Master_CLID_str=Master_CLID_str[2:len(Master_CLID_str) - 1]

def testing_int_instance(self):
    # NO UNPACK LINE
    # ADD self. TO EVERY Master_* VARIABLE
    ...

【讨论】:

  • 啊。接得好。但是使用下面的代码,我得到 NameError: name 'testing_int_instance' is not defined def testing_master(): #...SAME CODE... testing_int_instance(Master_CID, Master_CID_str, Master_CLID, Master_CLID_str) def testing_int_instance(Master_CID, Master_CID_str, Master_CLID, Master_CLID_str ): #...SKIP UNPACK LINE #...CONTINUE WITH SAME CODE... if name == "main": testing_master()
  • 如果在我在解决方案中提到的类中,您是否使用self. 对其进行了限定?在这个答案中没有使用类,只使用定义的方法。
  • 是的。下面是我的代码。类 TestDB1(unittest.TestCase): def testing_master(self): connect to master global Master_db Master_db = pyodbc.connect(Master_Conn) Master_Cursor = Master_db.cursor() try: .. .. time.sleep(310) finally: Master_Cursor。 close() def testing_int_instance(Master_CID, Master_CID_str, Master_CLID, Master_CLID_str): .. .. if name == 'main': testing_master()
  • 我看不到你在哪里调用方法:self.testing_int_instance(...)testing_master 的末尾。另外,在参数前添加 self 作为第一个参数:def testing_int_instance(self, Master_CID, ...).
  • 使用 self.testing_int_instance(Master_CID, Master_CID_str, Master_CLID, Master_CLID_str) 和 def testing_int_instance(self,Master_CID,Master_CID_str,Master_CLID,Master_CLID_str):,它对每个表写入一次 - 是的!!非常感谢。但是,一旦我运行测试,pycharm 就会在 testing_int_instance 函数中显示错误。 Spinner 继续进行 testing_master 功能。 testMethod() TypeError: testing_int_instance() 缺少 4 个必需的位置参数:“Master_CID”、“Master_CID_str”、“Master_CLID”和“Master_CLID_str”
猜你喜欢
  • 1970-01-01
  • 2012-06-24
  • 1970-01-01
  • 2014-11-22
  • 2015-01-28
  • 1970-01-01
  • 2016-08-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多