【问题标题】:Twisted will not send data back only if I use async DB ops只有当我使用异步数据库操作时,Twisted 才会发回数据
【发布时间】:2015-09-22 06:08:01
【问题描述】:

在处理了 inlineCallbacks 和 twisted/txredisapi 的 yield 之后,我可以将数据保存到 redis 中。感谢 txredisapi 的作者。现在我遇到了一个新问题,套接字服务器在保存到数据库之前/之后不会发送回客户端。

Twisted 提供如下简单的套接字服务器:

from twisted.internet import protocol, reactor

class Echo(protocol.Protocol):
    def dataReceived(self, data): 
        self.transport.write(data) ### write back 

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

reactor.listenTCP(8000, EchoFactory)
recctor.run()

我的代码是类似的,只是有额外的数据库操作。

#!/usr/bin/env python

import time
import binascii
import txredisapi

from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.internet.protocol import Factory
from twisted.enterprise import adbapi
from twisted.python import log

from dmpack import Dmpack
from dmdb import Dmdb
from dmconfig import DmConf

dm = Dmpack()
conf = DmConf().loadConf()
rcs = txredisapi.lazyConnection(password=conf['RedisPassword'])
dbpool = adbapi.ConnectionPool("MySQLdb",db=conf['DbName'],user=conf['DbAccount'],\
    passwd=conf['DbPassword'],host=conf['DbHost'],\
    use_unicode=True,charset=conf['DbCharset'])

def getDataParsed(data):
    realtime = None
    period = None
    self.snrCode = dm.snrToAscii(data[2:7])    
    realtime = data[7:167] # save it into redis
    period = data[167:-2] # save it into SQL
    return (snrCode, realtime, period)

class PlainTCP(protocol.Protocol):
    def __init__(self, factory):
        self.factory = factory
        self.factory.numConnections = 0
        self.snrCode = None 
        self.rData = None
        self.pData = None
        self.err = None

    def connectionMade(self):
        self.factory.numConnections += 1
        print "Nr. of connections: %d\n" %(self.factory.numConnections)
        self.transport.write("Hello remote\r\n") # it only prints very 5 connections.

    def connectionLost(self, reason):
        self.factory.numConnections -= 1
        print "Nr. of connections: %d\n" %(self.factory.numConnections)

    @defer.inlineCallbacks
    def dataReceived(self, data):
        global dbpool, rcs
        (self.snrCode,rDat,pDat) = getDataParsed(data)

        if self.snrCode == None or rDat == None or pDat == None:
            err = "Bad format"
        else:
            err = "OK"
        print "err:%s"%(err) # debug print to show flow control
        self.err = err 

        self.transport.write(self.snrCode)
        self.transport.write(self.err)
        self.transport.write(rDat)
        self.transport.write(pDat) 
        self.transport.loseConnection()

        if self.snrCode != None and rDat != None and pDat != None:    
            res = yield self.saveRealTimeData(rcs, rDat)        
            res = yield self.savePeriodData(dbpool, pDat, conf)

        print "err2:%s"%(err)  # debug print to show flow control


    @defer.inlineCallbacks
    def saveRealTimeData(self, rc, dat):
        key = "somekey"
        val = "somedata"
        yield rc.set(key,val)
        yield rc.expire(key,30)

    @defer.inlineCallbacks
    def savePeriodData(self,rc,dat,conf):
        query = "some SQL statement"
        yield rc.runQuery(query)

class PlainTCPFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return PlainTCP(self)

def main():
    dmdb = Dmdb()
    if not dmdb.detectDb():
        print "Please run MySQL RDBS first."
        sys.exit()

    log.startLogging(sys.stdout)

    reactor.listenTCP(8080, PlainTCPFactory())
    reactor.run()

if __name__ == "__main__":
    main()

还有我的客户的剪辑,这是一个简单的客户:

def connectSend(host="127.0.0.1",port=8080):
    global packet
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.connect((host, port))
        s.sendall(''.join(packet))
        data = s.recv(1024)
        s.close()
        print 'Received', repr(data)
    except socket.error, err:
        print "Remote socket is not available: %s"%str(err)
        sys.exit(1)

目前的状态是:

  • 如果禁用 @defer.inlineCallbacks 和 dataReceived() 的 yield 操作,connectionMode() 和 dataReceived() 内部的 self.transport.write() 和 dataReceived() 都可以向客户端输出数据。
  • 如果我们启用了@defer.inlineCallbacks 和 SQL/Redis 的两个 yield DB 操作,那么 connectionMode() 内部的 self.transport.write() 会每 5 个连接打印一次,而 dataReceived() 将不会向客户端输出任何数据。
  • 不管@defer.inlineCallbacks 无论如何,调试打印语句都会打印在日志上。

有人告诉我 dataReceived() 不应该是 @defer.inlineCallbacks。但如果我去掉那个装饰,它不会改变任何东西。

我正在考虑 gevent 是否可以帮助我摆脱这种无法预料的行为。我被扭曲成无尽的龙卷风,旋风......

哪位有类似经历的,请帮帮我。谢谢。

【问题讨论】:

    标签: redis twisted


    【解决方案1】:

    通过如下改变函数,代码就可以工作了。

    #COMMENT OUT decorator of @defer.inlineCallbacks
    
    def dataReceived(self, data):
        global dbpool, rcs
        (self.snrCode,rDat,pDat) = getDataParsed(data)
    
        if self.snrCode == None or rDat == None or pDat == None:
            err = "Bad format"
        else:
            err = "OK"
        print "err:%s"%(err) # debug print to show flow control
        self.err = err 
    
        self.transport.write(self.snrCode)
        self.transport.write(self.err)
        self.transport.write(rDat)
        self.transport.write(pDat) 
        self.transport.loseConnection()
    
        if self.snrCode != None and rDat != None and pDat != None:    
            self.saveRealTimeData(rcs, rDat)        
            self.savePeriodData(dbpool, pDat, conf)
            # Removing yield before DB ops
    
        print "err2:%s"%(err)  # debug print to show flow control
    
    
    @defer.inlineCallbacks
    def saveRealTimeData(self, rc, dat):
        print "saveRedis"
        key = "somekey"
        val = "somedata"
        yield rc.set(key,val)
        yield rc.expire(key,30)
    
    @defer.inlineCallbacks
    def savePeriodData(self,rc,dat,conf):
        print "save SQL"
        query = "some SQL statement"
        yield rc.runQuery(query)
    

    如果我们在 dataReceived 中保留 @defer.inlineCallbacks 和 yield。连接在第二个 DB 操作之前关闭。因此没有数据输出到连接。可能是由 inlineCallbacks 装饰器引起的。

    通过删除它,流控制变得简单明了。

    但是,如果有两个延迟的数据库操作,我仍然可以理解为什么我不能添加 inlineCallbacks。这次他们不需要延期了吗?

    【讨论】:

    • 此时,我得到了答案,但不知道为什么。
    猜你喜欢
    • 1970-01-01
    • 2021-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-19
    • 2019-07-10
    • 2019-10-09
    • 2013-09-24
    相关资源
    最近更新 更多