【问题标题】:Using Twisted AMP with Database insertion将 Twisted AMP 与数据库插入一起使用
【发布时间】:2012-04-25 16:19:24
【问题描述】:

我正在学习如何使用 Twisted AMP。我正在开发一个程序,将数据从客户端发送到服务器并将数据插入 SQLite3 DB。然后服务器将结果发送回客户端,指示成功或错误(尝试和除外可能不是最好的方法,但在我解决主要问题时它只是一个临时解决方案)。为了做到这一点,我修改了一个示例,该示例最初进行了求和并返回了结果,因此我意识到这可能不是执行我想做的最有效的方法。特别是我正在尝试对多次插入进行一些计时(即多次将数据发送到服务器以进行多次插入)并且我已经包含了我编写的代码。它可以工作,但显然它不是发送多个数据以进行插入的好方法,因为我在运行反应器之前执行多个连接。

我已经尝试了几种方法来解决这个问题,包括将 ClientCreator 传递给 reactor.callWhenRunning() 但你不能使用 deferred 来做到这一点。

任何有关如何执行此操作的建议、建议或帮助将不胜感激。这是代码。

服务器:

from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
import sqlite3, time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

class Protocol(amp.AMP):
    def __init__(self):     
       self.conn = sqlite3.connect('biomed1.db')
       self.c =self.conn.cursor()
       self.res=None

    @Insert.responder
    def dbInsert(self, data):
        self.InsertDB(data) #call the DB inserter
        result=self.res     # send back the result of the insertion
        return {'insert_result': result}

    def InsertDB(self,data):
      tm=time.time()
      print "insert time:",tm
      chx=data
      PID=2
      device_ID=5
      try:
        self.c.execute("INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES ('%s','%s','%s')" % (chx, PID, device_ID))    
      except Exception, err:
             print err
             self.res=0
      else:
             self.res=1

      self.conn.commit()


pf = Factory()
pf.protocol = Protocol
reactor.listenTCP(1234, pf) 
reactor.run()

客户:

from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.protocols import amp
import time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

def connected(protocol):
    return protocol.callRemote(Insert, data=5555).addCallback(gotResult)

def gotResult(result):
    print 'insert_result:', result['insert_result']
    tm=time.time()
    print "stop", tm    

def error(reason):
    print "error", reason

tm=time.time()
print "start",tm
for i in range (10): #send data over ten times
  ClientCreator(reactor, amp.AMP).connectTCP(
     '127.0.0.1', 1234).addCallback(connected).addErrback(error)

reactor.run()

代码结束。

谢谢。

【问题讨论】:

    标签: python twisted


    【解决方案1】:

    很少有东西可以改进您的服务器代码。

    首先:在twisted 中不鼓励使用直接数据库访问函数,因为它们通常会导致阻塞。 Twisted 对数据库访问有很好的抽象,它为数据库连接提供了扭曲的方法 - twisted.adbapi

    现在重用 db 连接:如果您想在多个协议实例中重用某些资产(如数据库连接),您应该在 Factory 的构造函数中初始化这些资产,或者如果您不想在启动时启动这些东西,创建一个资源访问方法,该方法将在第一次方法调用时启动资源,然后将其分配给类变量并在后续调用时返回。

    Factory创建特定Protocol实例时,会在协议内部添加对自身的引用,见line 97 of twisted.internet.protocol

    然后在您的协议实例中,您可以访问共享数据库连接实例,例如:

    self.factory.whatever_name_for_db_connection.doSomething() 
    

    重做的服务器代码(我没有可用的 python、twisted 甚至像样的 IDE,所以这几乎没有经过测试,一些错误是可以预料的)

    from twisted.protocols import amp
    from twisted.internet import reactor
    from twisted.internet.protocol import Factory
    import time
    
    class AMPDBAccessProtocolFactory(Factory):
        def getDBConnection(self):
            if 'dbConnection' in dir(self):
                return self.dbConnection
            else:
                self.dbConnection = SQLLiteTestConnection(self.dbURL)
                return self.dbConnection
    
    class SQLLiteTestConnection(object):
        """
        Provides abstraction for database access and some business functions.
        """
        def __init__(self,dbURL):
            self.dbPool =  adbapi.ConnectionPool("sqlite3" , dbURL,  check_same_thread=False)
    
        def insertBTData4(self,data):
            query = "INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES (%s,%s,%s)" 
            tm=time.time()
            print "insert time:",tm
            chx=data
            PID=2
            device_ID=5
            dF = self.dbPool.runQuery(query,(chx, PID, device_ID)) 
            dF.addCallback(self.onQuerySuccess,insert_data=data)
            return dF
        def onQuerySuccess(self,insert_data,*r):
            """
            Here you can inspect query results or add any other valuable information to be parsed at client.
            For the test sake we will just return True to a customer if query was a success.
            original data available at kw argument insert_data
            """
            return True
    
    
    class Insert(amp.Command):
        arguments = [('data', amp.Integer())]
        response = [('insert_result', amp.Integer())]
    
    class MyAMPProtocol(amp.AMP):
    
        @Insert.responder
        def dbInsert(self, data):
            db = self.factory.getDBConnection()
            dF = db.insertBTData4(data)
            dF.addErrback(self.onInsertError,data)
            return dF
    
        def onInsertError(self, error, data):
            """
            Here you could do some additional error checking or inspect data 
            which was handed for insert here. For now we will just throw the same exception again
            so that the client gets notified
            """
            raise error
    
    if __name__=='__main__':
        pf = AMPDBAccessProtocolFactory()
        pf.protocol = MyAMPProtocol
        pf.dbURL='biomed1.db'
        reactor.listenTCP(1234, pf) 
        reactor.run()
    

    现在到客户端。如果 AMP 遵循整体 RPC 逻辑(目前无法对其进行测试),它应该能够在多个调用中细读相同的连接。因此,我创建了一个 ServerProxy 类,该类将保存该可阅读的协议实例并为调用提供抽象:

    from twisted.internet import reactor
    from twisted.internet.protocol import ClientCreator
    from twisted.protocols import amp
    import time
    
    class Insert(amp.Command):
        arguments = [('data', amp.Integer())]
        response = [('insert_result', amp.Integer())]
    
    class ServerProxy(object):
        def connected(self,protocol):
            self.serverProxy = protocol # assign protocol as instance variable
            reactor.callLater(5,self.startMultipleInsert) #after five seconds start multiple insert procedure
    
        def remote_insert(self,data):
            return self.serverProxy.callRemote(Insert, data)
    
        def startMultipleInsert(self):
            for i in range (10): #send data over ten times
                dF = self.remote_insert(i)
                dF.addCallback(self.gotInsertResult)
                dF.addErrback(error)
    
        def gotInsertResult(self,result):
            print 'insert_result:', str(result)
            tm=time.time()
            print "stop", tm    
    
    def error(reason):
        print "error", reason
    
    
    def main():
        tm=time.time()
        print "start",tm
        serverProxy = ServerProxy()
        ClientCreator(reactor, amp.AMP).connectTCP('127.0.0.1', 1234).addCallback(serverProxy.connected).addErrback(error)
        reactor.run()    
    
    if __name__=='__main__':
        main()
    

    【讨论】:

    • 非常感谢。我目前正在研究您的解决方案。我最初使用 adbapi 但发生的事情是,当 adbapi 返回延迟时,AMP 结果已经返回了一个值。基本上我有两个串联的延迟(adbapi 和远程调用)如果我正在等待从客户端完成插入,那么(据我所见)adbapi 没有任何优势。但是,您可能已经找到了解决此问题的方法,所以我会让您的代码运行并看看会发生什么。非常感谢。
    • 我怀疑您正在考虑采用标准线性代码方式的扭曲过程。这是我刚开始从事扭曲项目时必须克服的主要障碍。在扭曲中,您不能认为您的执行流程是线性的。因此 - 例如,您可以快速执行多个数据库调用,而无需等待下一行的回复。无论你在哪里收到延期,你还没有执行任何事情(在大多数情况下),你已经把一个工作交给了执行。
    • 我是 Twisted 的新手,所以你是对的。我运行了您的代码并收到此错误:文件“DBAserver4.py”,第 48 行,在 dbInsert db = self.protocol.getDBConnection() exceptions.AttributeError: 'MyAMPProtocol' object has no attribute 'protocol' - 我尝试了各种方法让它工作,但无济于事。我做了一个 hack 并且能够运行,但它挂在了查询格式上,我将使用它。但我不明白为什么它在 MyAMPProtocol 中看不到协议。再次感谢。
    • 那行得通。我应该想尝试一下。我将处理查询格式,它应该运行。谢谢!
    • 我修复了查询格式,还将插入响应更改为 amp.Boolean 和 dbInsert 中的返回格式以匹配插入响应格式。在正常操作中,我在客户端看到了 True (onQuerySuccess)。但是,即使我在 onQuerySuccess 中将返回值更改为 False,我仍然会收到一个 True 发送回客户端。此外,如果我在查询字符串中生成错误,则 errBack onInsertError 不会捕获它。我尝试了许多变通方法,但没有解决方案。我会继续努力。如果您有兴趣,我可以重新发布代码更改。
    猜你喜欢
    • 1970-01-01
    • 2013-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-03
    • 2011-10-23
    • 2014-12-18
    • 1970-01-01
    相关资源
    最近更新 更多