【问题标题】:Streaming HTTP body over FTP with twisted通过 FTP 流式传输 HTTP 正文
【发布时间】:2015-03-23 11:05:10
【问题描述】:

我有一个自定义 FTP 服务器,它与 api 对话以获取文件夹列表等,并且文件作为 api 的 url 返回。我正在尝试打开这些 url 的 http 流并通过 ftp 将数据反馈给客户端(以非阻塞方式),但我不知道如何连接它。

我试图整理一个最小的例子来更好地解释我的问题。在示例中,它在端口 2121 上启动了一个本地 FTP 服务器,该服务器列出了本地文件系统,但在下载文件时,它返回 www.yahoo.com 的内容正文而不是文件数据。

我尝试通过io.BytesIO 对象缓冲数据,但没有发送任何数据。我想知道这是否是正确的方法,还是因为读取指针可能总是在文件对象的末尾?

示例代码如下:

import io
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell, _FileReader
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess, FilePasswordDB
from twisted.internet import defer

agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])

class StreamWriter(Protocol):
    def __init__(self, finished, stream):
        self.finished = finished
        self.stream = stream

    def dataReceived(self, bytes):
        self.stream.write(bytes)

    def connectionLost(self, reason):
        print 'Finished receiving body:', reason.type, reason.value
        self.finished.callback(None)


def streamBody(response, stream):
    finished = Deferred()
    response.deliverBody(StreamWriter(finished, stream))
    return finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.yahoo.com")

    stream = io.BytesIO()
    d.addCallback(lambda resp: streamBody(resp, stream))
    d.addErrback(log.err)

    return defer.succeed(_FileReader(stream))

def main():

    FTPAnonymousShell.openForReading = openForReading

    p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])

    f = FTPFactory(p)

    reactor.listenTCP(2121, f)
    reactor.run()

if __name__ == "__main__":
    main()

编辑

class FinishNotifier(ProtocolToConsumerAdapter, Protocol):
    def __init__(self, original):
        ProtocolToConsumerAdapter.__init__(self, original)
        self.finished = defer.Deferred()

    def connectionLost(self, reason=connectionDone):
        reason.trap(ConnectionDone, ResponseDone)
        self.finished.callback(None)

class HTTP2FTP(object):
    def __init__(self, response):
        self.response = response

    def send(self, consumer):
        protocol = FinishNotifier(consumer)
        self.response.deliverBody(protocol)
        return protocol.finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.yahoo.com")

    d.addCallback(HTTP2FTP)
    d.addErrback(log.err)

    return d

更新的可运行示例:

from twisted.python import log
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ConsumerToProtocolAdapter, connectionDone
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder, HTTPConnectionPool, HTTPClientFactory
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess
from twisted.internet import defer
from twisted.internet.error import ConnectionDone
from twisted.web._newclient import ResponseDone

agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
# pool= HTTPConnectionPool(reactor,persistent=True)
# pool.maxPersistentPerHost = 2
# agent= Agent(reactor,pool=pool,connectTimeout=5)

class FinishNotifier(ConsumerToProtocolAdapter):
    def __init__(self, original):
        ConsumerToProtocolAdapter.__init__(self, original)
        self.finished = defer.Deferred()

    def connectionLost(self, reason=connectionDone):
        reason.trap(ConnectionDone, ResponseDone)
        print "finished"
        self.finished.callback(None)

class HTTP2FTP(object):
    def __init__(self, response):
        self.response = response

    def send(self, consumer):
        print consumer
        protocol = FinishNotifier(consumer)
        self.response.deliverBody(protocol)
        return protocol.finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.testtest.com")
    d.addCallback(HTTP2FTP)
    d.addErrback(log.err)

    return d

def main():

    FTPAnonymousShell.openForReading = openForReading

    p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])

    f = FTPFactory(p)

    reactor.listenTCP(2121, f)
    reactor.run()

if __name__ == "__main__":
    main()

【问题讨论】:

    标签: python python-2.7 ftp streaming twisted


    【解决方案1】:

    如果是因为读指针总是在文件对象的末尾?

    大概是这样。你有两件事同时发生。 HTTP 客户端正在写入BytesIO 实例,而 FTP 客户端正在从中读取。 _FileReader(一个私有 API,一个 Twisted 的 FTP 库的实现细节,不是你应该实际使用的东西)用于从一个已经完成的文件中读取 - 而不是一个正在增长的文件因为它正在被读取来自。

    幸运的是,无需通过异步不友好的file 接口。查看openForReading 应该返回的类型——IReadFile 提供程序。 IReadFile 有一个方法send,它接受一个提供IConsumer 的对象。

    另一方面,您有deliverBody,它接受IProtocol。该协议具有传递给它的 HTTP 响应正文。这是您要传递给IReadFile.sendIConsumer 的数据。

    因此,与其尝试使这两个部分与BytesIO 一起工作,不如使用所涉及的两个接口使它们一起工作:IProtocolIConsumer。这是一个草图(其中有很多错误,但总体形状是正确的):

    from twisted.internet.protocol import ConsumerToProtocolAdapter
    from twisted.internet.interfaces import IPushProducer
    from twisted.protocols.ftp import IReadFile
    
    class FinishNotifier(ConsumerToProtocolAdapter):
        def connectionLost(self, reason):
            reason.trap(ConnectionDone)
            self.finished.callback(None)
    
    @implementer(IReadFile, IPushProducer)
    class HTTP2FTP(object):
        def send(self, consumer):
            protocol = FinishNotifier(consumer)
            response.deliverBody(protocol)
            # Lazy hack.
            # This code probably belongs in protocol.connectionMade instead.
            self._producer = protocol.transport
            consumer.registerProducer(self._producer, streaming=True)
            protocol.finished.addCallback(
                lambda ignored: consumer.unregisterProducer()
            )
            return protocol.finished
    
        def pauseProducing(self):
            self._producer.pauseProducing()
    
        def resumeProducing(self):
            self._producer.resumeProducing()
    
        def stopProducing(self):
            self._producer.stopProducing()
    

    请注意,通过在此处实现IPushProducer,我们可以控制 HTTP 和 FTP 连接之间的流量(这样服务器上的内存使用就会受到限制,即使 HTTP 连接传输数据的速度比 FTP 连接快得多)。这是一件很酷的事情,很高兴它只需要额外的几行来实现。稍微不那么酷的是您必须在正确的时间拨打unregisterProducer。 FTP 协议实现将此用作数据已完全传输的指示。这可能在 Twisted 中没有充分记录(这是一个应该纠正的疏忽)。

    【讨论】:

    • 非常感谢@Jean-Paul。我想我正在尝试返回一个需要像对象这样的文件的_FileReader。你的回答很有道理。我似乎仍然无法让它正常工作 - 我返回了空文件,但我不知道为什么,我将用我尝试过的内容编辑我的问题
    • 刚刚使用更新的可运行示例编辑了问题,无法弄清楚为什么我仍然没有获得任何数据 - 看起来它应该对我有用。在我添加 HTTP2FTP 的地方,是否需要在此时返回 deferred?
    • 当我运行示例时,justdevelop.it 会回复一个没有内容长度的 HTTP/1.0 响应。所以connectionLost 是用PotentialDataLoss 调用的,所以传输永远不会被识别为完成(因为connectionLost 不处理该异常类型)。
    • 啊-我更改了网址。我想我可以在陷阱中添加PotentialDataLoss 或者添加一个返回内容长度的url 抱歉!
    • 我将使用 API 来获取文件的 url 都有内容长度,所以我不必担心
    猜你喜欢
    • 2017-05-07
    • 1970-01-01
    • 1970-01-01
    • 2012-03-30
    • 1970-01-01
    • 2012-04-25
    • 1970-01-01
    • 2017-10-27
    • 1970-01-01
    相关资源
    最近更新 更多