【问题标题】:Creating a processing queue in Tornado在 Tornado 中创建处理队列
【发布时间】:2015-03-30 18:57:29
【问题描述】:

我正在使用 Tornado Web 服务器对需要在请求/响应周期之外处理的项目进行排队。

在下面的简化示例中,每次收到请求时,我都会将一个新字符串添加到名为 queued_items 的列表中。我想创建一些东西来监视该列表并在它们显示在列表中时对其进行处理。

(在我的真实代码中,项目通过 TCP 套接字进行处理和发送,当 Web 请求到达时,该套接字可能连接也可能不连接。我希望 Web 服务器继续排队项目,而不管套接字连接如何)

我试图让这段代码保持简单,而不是使用 Redis 或 Beanstalk 等外部队列/程序。它不会有很大的音量。

使用 Tornado 习语查看 client.queued_items 列表中的新项目并在它们到达时处理它们的好方法是什么?

import time

import tornado.ioloop
import tornado.gen
import tornado.web

class Client():

    def __init__(self):
        self.queued_items = []

    @tornado.gen.coroutine
    def watch_queue(self):
        # I have no idea what I'm doing
        items = yield client.queued_items
        # go_do_some_thing_with_items(items)

class IndexHandler(tornado.web.RequestHandler):

    def get(self):
        client.queued_items.append("%f" % time.time())
        self.write("Queued a new item")

if __name__ == "__main__":

    client = Client()

    # Watch the queue for when new items show up
    client.watch_queue()

    # Create the web server 
    application = tornado.web.Application([
        (r'/', IndexHandler),
    ], debug=True)

    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

【问题讨论】:

    标签: python tornado


    【解决方案1】:

    有一个名为toro 的库,它为tornado 提供同步原语。 [更新:从 tornado 4.2 开始,toro 已合并到 tornado。]

    听起来您可以只使用toro.Queue(或tornado 4.2+ 中的tornado.queues.Queue)来处理这个问题:

    import time
    
    import toro
    import tornado.ioloop
    import tornado.gen
    import tornado.web
    
    class Client():
    
        def __init__(self):
            self.queued_items = toro.Queue()
    
        @tornado.gen.coroutine
        def watch_queue(self):
            while True:
                items = yield self.queued_items.get()
                # go_do_something_with_items(items)
    
    class IndexHandler(tornado.web.RequestHandler):
    
        @tornado.gen.coroutine
        def get(self):
            yield client.queued_items.put("%f" % time.time())
            self.write("Queued a new item")
    
    if __name__ == "__main__":
    
        client = Client()
    
        # Watch the queue for when new items show up
        tornado.ioloop.IOLoop.current().add_callback(client.watch_queue)
    
        # Create the web server 
        application = tornado.web.Application([
            (r'/', IndexHandler),
        ], debug=True)
    
        application.listen(8888)
        tornado.ioloop.IOLoop.current().start()
    

    除了将数据结构从列表切换到toro.Queue之外,还需要进行一些调整:

    1. 我们需要安排watch_queue 使用add_callback 在IOLoop 内部运行,而不是尝试在IOLoop 上下文之外直接调用它。
    2. IndexHandler.get 需要转换为协程,因为toro.Queue.put 是协程。

    我还在watch_queue 中添加了一个while True 循环,这样它将永远运行,而不是只处理一个项目然后退出。

    【讨论】:

    • 这正是我所需要的。感谢您向我展示如何实现它。
    • Dano - 我怎样才能停止观看队列?当我的连接变坏时,我需要暂时停止处理队列中的项目,但不要丢失它们。
    • toro 已合并到龙卷风中,现已弃用。对于龙卷风 >= 4.2,您可以使用 tornado.queues.Queue
    • @MattiJohn 谢谢,我已经相应地更新了答案。
    • 我正在使用此代码并在go_do_something_with_items(items) 添加了睡眠功能。现在,如果我尝试快速连续调用 api,则 api 会被阻止,程序会一直等到前一个请求完成。我怎样才能有并发访问?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-12-14
    • 2018-10-26
    • 1970-01-01
    • 1970-01-01
    • 2016-09-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多