【发布时间】:2015-03-23 11:05:10
【问题描述】:
我有一个自定义 FTP 服务器,它与 api 对话以获取文件夹列表等,并且文件作为 api 的 url 返回。我正在尝试打开这些 url 的 http 流并通过 ftp 将数据反馈给客户端(以非阻塞方式),但我不知道如何连接它。
我试图整理一个最小的例子来更好地解释我的问题。在示例中,它在端口 2121 上启动了一个本地 FTP 服务器,该服务器列出了本地文件系统,但在下载文件时,它返回 www.yahoo.com 的内容正文而不是文件数据。
我尝试通过io.BytesIO 对象缓冲数据,但没有发送任何数据。我想知道这是否是正确的方法,还是因为读取指针可能总是在文件对象的末尾?
示例代码如下:
import io
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell, _FileReader
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess, FilePasswordDB
from twisted.internet import defer
agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
class StreamWriter(Protocol):
def __init__(self, finished, stream):
self.finished = finished
self.stream = stream
def dataReceived(self, bytes):
self.stream.write(bytes)
def connectionLost(self, reason):
print 'Finished receiving body:', reason.type, reason.value
self.finished.callback(None)
def streamBody(response, stream):
finished = Deferred()
response.deliverBody(StreamWriter(finished, stream))
return finished
def openForReading(self, path):
d = agent.request("GET", "http://www.yahoo.com")
stream = io.BytesIO()
d.addCallback(lambda resp: streamBody(resp, stream))
d.addErrback(log.err)
return defer.succeed(_FileReader(stream))
def main():
FTPAnonymousShell.openForReading = openForReading
p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])
f = FTPFactory(p)
reactor.listenTCP(2121, f)
reactor.run()
if __name__ == "__main__":
main()
编辑
class FinishNotifier(ProtocolToConsumerAdapter, Protocol):
def __init__(self, original):
ProtocolToConsumerAdapter.__init__(self, original)
self.finished = defer.Deferred()
def connectionLost(self, reason=connectionDone):
reason.trap(ConnectionDone, ResponseDone)
self.finished.callback(None)
class HTTP2FTP(object):
def __init__(self, response):
self.response = response
def send(self, consumer):
protocol = FinishNotifier(consumer)
self.response.deliverBody(protocol)
return protocol.finished
def openForReading(self, path):
d = agent.request("GET", "http://www.yahoo.com")
d.addCallback(HTTP2FTP)
d.addErrback(log.err)
return d
更新的可运行示例:
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ConsumerToProtocolAdapter, connectionDone
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder, HTTPConnectionPool, HTTPClientFactory
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess
from twisted.internet import defer
from twisted.internet.error import ConnectionDone
from twisted.web._newclient import ResponseDone
agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
# pool= HTTPConnectionPool(reactor,persistent=True)
# pool.maxPersistentPerHost = 2
# agent= Agent(reactor,pool=pool,connectTimeout=5)
class FinishNotifier(ConsumerToProtocolAdapter):
def __init__(self, original):
ConsumerToProtocolAdapter.__init__(self, original)
self.finished = defer.Deferred()
def connectionLost(self, reason=connectionDone):
reason.trap(ConnectionDone, ResponseDone)
print "finished"
self.finished.callback(None)
class HTTP2FTP(object):
def __init__(self, response):
self.response = response
def send(self, consumer):
print consumer
protocol = FinishNotifier(consumer)
self.response.deliverBody(protocol)
return protocol.finished
def openForReading(self, path):
d = agent.request("GET", "http://www.testtest.com")
d.addCallback(HTTP2FTP)
d.addErrback(log.err)
return d
def main():
FTPAnonymousShell.openForReading = openForReading
p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])
f = FTPFactory(p)
reactor.listenTCP(2121, f)
reactor.run()
if __name__ == "__main__":
main()
【问题讨论】:
标签: python python-2.7 ftp streaming twisted