【问题标题】:Tornado celery integration hacks龙卷风芹菜集成黑客
【发布时间】:2011-11-20 14:46:37
【问题描述】:

由于没有人为 this post 提供解决方案,而且我迫切需要解决方法,所以这里是我的情况和一些抽象的解决方案/想法供辩论。

我的堆栈:

  1. 龙卷风
  2. 芹菜
  3. MongoDB
  4. Redis
  5. RabbitMQ

我的问题:为 Tornado 找到一种方法来调度 celery 任务(已解决),然后异步收集结果(有什么想法吗?)。

场景 1:(请求/响应 hack 加 webhook)

  • Tornado 收到一个(用户)请求,然后在本地内存(或 Redis 中)保存一个 { jobID : (user)request} 以记住将响应传播到哪里,并使用 jobID 触发一个 celery 任务
  • 当 celery 完成任务时,它会在某个 url 上执行一个 webhook 并告诉 tornado 这个 jobID 已经完成(加上结果)
  • Tornado 检索(用户)请求并将响应转发给(用户)

这会发生吗?有什么逻辑吗?

场景 2:(龙卷风加长轮询)

  • Tornado 调度 celery 任务并将一些主要的 json 数据返回给客户端(jQuery)
  • jQuery 会在收到主 json 后进行一些长轮询,例如,每隔 x 微秒,然后 tornado 会根据某些数据库标志进行回复。当 celery 任务完成时,这个数据库标志设置为 True,然后 jQuery“循环”结束。

这样有效吗?

还有其他想法/模式吗?

【问题讨论】:

    标签: python celery tornado


    【解决方案1】:

    我的解决方案是从龙卷风轮询到芹菜:

    class CeleryHandler(tornado.web.RequestHandlerr):
    
        @tornado.web.asynchronous
        def get(self):    
    
            task = yourCeleryTask.delay(**kwargs)
    
            def check_celery_task():
                if task.ready():
                    self.write({'success':True} )
                    self.set_header("Content-Type", "application/json")  
                    self.finish()
                else:   
                    tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)
    
            tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)
    

    这里是post

    【讨论】:

    • 能否请您重新发布您的博客链接,它已被删除!
    • 编辑为archive.org链接
    【解决方案2】:

    这是我们解决问题的方法。由于我们在应用程序中查找多个处理程序的结果,因此我们将 celery 查找设置为 mixin 类。

    这也使 tornado.gen 模式的代码更具可读性。

    from functools import partial
    
    class CeleryResultMixin(object):
        """
        Adds a callback function which could wait for the result asynchronously
        """
        def wait_for_result(self, task, callback):
            if task.ready():
                callback(task.result)
            else:
                # TODO: Is this going to be too demanding on the result backend ?
                # Probably there should be a timeout before each add_callback
                tornado.ioloop.IOLoop.instance().add_callback(
                    partial(self.wait_for_result, task, callback)
                )
    
    
    class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
        """Execute a task asynchronously over a celery worker.
        Wait for the result without blocking
        When the result is available send it back
        """
        @tornado.web.asynchronous
        @tornado.web.authenticated
        @tornado.gen.engine
        def post(self):
            """Test the provided Magento connection
            """
            task = expensive_task.delay(
                self.get_argument('somearg'),
            )
    
            result = yield tornado.gen.Task(self.wait_for_result, task)
    
            self.write({
                'success': True,
                'result': result.some_value
            })
            self.finish()
    

    【讨论】:

      【解决方案3】:

      我偶然发现了这个问题,并且反复点击结果后端对我来说并不是最佳选择。所以我使用 Unix Sockets 实现了一个类似于你的场景 1 的 Mixin。

      它会在任务完成后立即通知 Tornado(准确地说,是在链中的下一个任务运行时)并且只命中后端结果一次。这是link

      【讨论】:

        【解决方案4】:

        现在,https://github.com/mher/tornado-celery 来救援了……

        class GenAsyncHandler(web.RequestHandler):
            @asynchronous
            @gen.coroutine
            def get(self):
                response = yield gen.Task(tasks.sleep.apply_async, args=[3])
                self.write(str(response.result))
                self.finish()
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多