【问题标题】:Twisted / perform asynchronous http requestsTwisted / 执行异步 http 请求
【发布时间】:2013-05-14 10:36:34
【问题描述】:

我有一个扭曲的反应器来监听传入的数据。我有第二个反应器在特定时间间隔内执行 http 请求,将结果发送到第一个反应器。两者都运行良好。

现在我想把它放在一个反应​​堆中运行,但我不知道如何实现。类似的东西 - 每 60 秒执行一次 http 请求。在第一个监听的“主”反应器中以异步方式。

我现在拥有的是:

# main reactor listening for incoming data forever
...
reactor.listenTCP(8123, TCPEventReceiverFactory())

http reactor 使用twisted.internet.defer.DeferredSemaphore() 执行多个http 检查:

# create semaphore to manage the deferreds
semaphore = twisted.internet.defer.DeferredSemaphore(2)

# create a list with all urls to check
dl = list()
# append deferreds to list
for url in self._urls:
    # returns deferred
    dl.append(semaphore.run(self._getPage, url))

# get a DefferedList
dl = twisted.internet.defer.DeferredList(dl)
# add some callbacks for error handling
dl.addCallbacks(lambda x: reactor.stop(), self._handleError)

# start the reactor    
reactor.run()

如何将定时 http 检查添加到“主”反应器,以便它们以异步方式执行? DeferredSemaphore 究竟是如何工作的?

谁能帮我解决这个问题?

[这是一种处理http校验结果的轻量级监控系统。我是 Twisted 和异步编程的新手。我在运行 Python 2.7 的 Xubuntu 12.04]

【问题讨论】:

  • 什么是延迟信号量?
  • 用于限制同时执行的请求。

标签: python asynchronous python-2.7 twisted


【解决方案1】:

您不需要多个反应器。只需使用同一个反应器执行所有不同的操作。

如果你调用reactor.stop(),你很可能做错了什么,所以让我们摆脱它,将它全部绑定到一个函数中(我们可以将其用作回调);因为它在做异步工作,它也应该返回一个延迟的,我们将使用你已经在使用的DeferredList

def thing_that_does_http():
    # create semaphore to manage the deferreds
    semaphore = twisted.internet.defer.DeferredSemaphore(2)

    # create a list with all urls to check
    dl = DeferredList()
    # append deferreds to list
    for url in self._urls:
        # returns deferred
        dl.append(semaphore.run(self._getPage, url))

    # get a DefferedList
    dl = twisted.internet.defer.DeferredList(dl)
    # add some callbacks for error handling
    dl.addErrback(self._handleError)
    return dl

“在特定时间间隔内执行 x”的自然方式是循环调用。有了这个回调函数,我们就不需要做太多了

reactor.listenTCP(8123, TCPEventReceiverFactory())
loop_http = twisted.intertnet.task.LoopingCall(thing_that_does_http)
# run once per minute, starting now.
loop_http.start(60)

反应器LoopingCallgetPage 将用于他们自己的目的是twisted.internet.reactor,如果您使用不同的反应器,例如,如果您正在执行单元测试,则需要覆盖该默认值。

LoopingCall为例,很简单,在构造之后(但调用它的start()方法之前),设置它的clock属性:

from twisted.internet.task import Clock
fake_reactor = Clock()
loop_http.clock = fake_reactor
fake_reactor.advance(120)  # move time forward two minutes...

不幸的是,getPage() 的情况不太好。您不能使用具有该接口的任何其他反应器;您需要使用更新的、更闪亮的t.w.c.Agent。在许多方面Agent 是优越的,但当您只希望原始响应正文作为字符串时,它就不太方便了。

除了要求将显式反应器传递给其构造函数之外,它更多的是对请求/响应周期的细粒度控制,而不是 getPage 提供的便利。因此,它主要以Producers 和Protocols 的形式实现。在前者的情况下,我们可以通过一个方便的助手FileBodyProducer 以最小的麻烦发送请求正文;在后者中,我们需要一个简单的协议来缓冲所有数据块,直到我们得到所有数据。

这里有一段代码可以替换getPage,接口大致相同,但将Agent的实例作为第一个参数

from cStringIO import StringIO
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import ResponseDone
from twisted.web.client import FileBodyProducer


class GetPageProtocol(Protocol):
    def __init__(self):
        self.deferred = Deferred()
        self.data = []

    def dataReceived(self, data):
        self.data.append(data)

    def connectionLost(self, reason):
        reason.trap(ResponseDone)
        data = ''.join(self.data)
        del self.data
        self.deferred.callback(data)


def agentGetPage(agent, url,
                 method="GET",
                 headers=None,
                 postdata=None):
    if postdata is not None:
        bodyProducer = FileBodyProducer(StringIO(postdata))
    else:
        bodyProducer = None

    def _getPageResponded(response):
        if response.length != 0:
            proto = GetPageProtocol()
            response.deliverBody(proto)
            return proto.deferred
        else:
            return None

    d = agent.request(method, url, headers, bodyProducer)
    d.addCallback(_getPageResponded)
    return d

在单元测试中,它看起来有点像:

from twisted.test.proto_helpers import MemoryReactor
from twisted.web.client import Agent
fake_reactor = MemoryReactor()
agent = Agent(fake_reactor)
d = agentGetPage(agent, "http://example.com")

assert fake_reactor.tcpClients  # or some such, exercise the code by manipulating the reactor

编辑:我最初想略过这个给ectomorph,不要混淆;但是,尽早正确处理反应器也是一个不错的主意,然后避免不必要的痛苦。

【讨论】:

  • 谢谢!太棒了,这正是我想要的。 twisted.internet.reactor 是我正在使用的反应器。
  • 很好的答案,但并不完全正确:)。 LoopingCall 实际上会使用self.clock,它只是初始化为twisted.internet.reactor默认情况下。改变它的能力很重要,尤其是对于测试。 (遗憾的是,getPage 被有效地硬编码为它,这也是我们现在推荐twisted.web.client.Agent 的原因之一。)
  • @Glyph:更新:在那里,我认为这涵盖了使用 reactor 不那么糟糕。
  • 谢谢你们的完成!
  • @TokenMacGuy 感谢您为后代清理答案!如果你觉得更加更慷慨,实际上有一张票可以向 Twisted 本身添加一个像 agentGetPage 这样的函数,以弥合这种便利性差距。有没有机会你有兴趣研究它? twistedmatrix.com/trac/ticket/5405
猜你喜欢
  • 2015-10-24
  • 1970-01-01
  • 2018-12-05
  • 2012-04-03
  • 1970-01-01
  • 2022-11-28
  • 1970-01-01
  • 2017-10-19
  • 1970-01-01
相关资源
最近更新 更多