【问题标题】:Tornado celery integration hacks龙卷风芹菜集成黑客
【发布时间】:2011-11-20 14:46:37
【问题描述】:
由于没有人为 this post 提供解决方案,而且我迫切需要解决方法,所以这里是我的情况和一些抽象的解决方案/想法供辩论。
我的堆栈:
- 龙卷风
- 芹菜
- MongoDB
- Redis
- RabbitMQ
我的问题:为 Tornado 找到一种方法来调度 celery 任务(已解决),然后异步收集结果(有什么想法吗?)。
场景 1:(请求/响应 hack 加 webhook)
- Tornado 收到一个(用户)请求,然后在本地内存(或 Redis 中)保存一个 { jobID : (user)request} 以记住将响应传播到哪里,并使用 jobID 触发一个 celery 任务
- 当 celery 完成任务时,它会在某个 url 上执行一个 webhook 并告诉 tornado 这个 jobID 已经完成(加上结果)
- Tornado 检索(用户)请求并将响应转发给(用户)
这会发生吗?有什么逻辑吗?
场景 2:(龙卷风加长轮询)
- Tornado 调度 celery 任务并将一些主要的 json 数据返回给客户端(jQuery)
- jQuery 会在收到主 json 后进行一些长轮询,例如,每隔 x 微秒,然后 tornado 会根据某些数据库标志进行回复。当 celery 任务完成时,这个数据库标志设置为 True,然后 jQuery“循环”结束。
这样有效吗?
还有其他想法/模式吗?
【问题讨论】:
标签:
python
celery
tornado
【解决方案1】:
我的解决方案是从龙卷风轮询到芹菜:
class CeleryHandler(tornado.web.RequestHandlerr):
@tornado.web.asynchronous
def get(self):
task = yourCeleryTask.delay(**kwargs)
def check_celery_task():
if task.ready():
self.write({'success':True} )
self.set_header("Content-Type", "application/json")
self.finish()
else:
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)
这里是post。
【解决方案2】:
这是我们解决问题的方法。由于我们在应用程序中查找多个处理程序的结果,因此我们将 celery 查找设置为 mixin 类。
这也使 tornado.gen 模式的代码更具可读性。
from functools import partial
class CeleryResultMixin(object):
"""
Adds a callback function which could wait for the result asynchronously
"""
def wait_for_result(self, task, callback):
if task.ready():
callback(task.result)
else:
# TODO: Is this going to be too demanding on the result backend ?
# Probably there should be a timeout before each add_callback
tornado.ioloop.IOLoop.instance().add_callback(
partial(self.wait_for_result, task, callback)
)
class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
"""Execute a task asynchronously over a celery worker.
Wait for the result without blocking
When the result is available send it back
"""
@tornado.web.asynchronous
@tornado.web.authenticated
@tornado.gen.engine
def post(self):
"""Test the provided Magento connection
"""
task = expensive_task.delay(
self.get_argument('somearg'),
)
result = yield tornado.gen.Task(self.wait_for_result, task)
self.write({
'success': True,
'result': result.some_value
})
self.finish()
【解决方案3】:
我偶然发现了这个问题,并且反复点击结果后端对我来说并不是最佳选择。所以我使用 Unix Sockets 实现了一个类似于你的场景 1 的 Mixin。
它会在任务完成后立即通知 Tornado(准确地说,是在链中的下一个任务运行时)并且只命中后端结果一次。这是link。
【解决方案4】:
现在,https://github.com/mher/tornado-celery 来救援了……
class GenAsyncHandler(web.RequestHandler):
@asynchronous
@gen.coroutine
def get(self):
response = yield gen.Task(tasks.sleep.apply_async, args=[3])
self.write(str(response.result))
self.finish()