一个基本但可行的解决方案(gist):
您可以通过仅从排队作业的路由重定向来做到这一点,然后让元标记定期刷新该页面。首先导入需要的库:
from flask import Flask, redirect, url_for, render_template_string
app = Flask(__name__)
from time import sleep
from rq import Queue
from rq.job import Job
from redis import Redis
设置rq相关连接,并定义要运行的函数:
r = Redis(host='redisserver')
q = Queue(connection=r)
def slow_func(data):
sleep(5)
return 'Processed %s' % (data,)
然后定义一个可以每5秒刷新一次页面的模板:
template_str='''<html>
<head>
{% if refresh %}
<meta http-equiv="refresh" content="5">
{% endif %}
</head>
<body>{{result}}</body>
</html>'''
我们还将使用flask render_template_string 创建一个辅助函数来返回插入了变量的模板。请注意,如果未提供刷新,则默认为 False:
def get_template(data, refresh=False):
return render_template_string(template_str, result=data, refresh=refresh)
现在创建一个路由,将我们的函数排入队列,获取它的 rq job-id,然后使用 id 返回重定向到 result 视图。这只是在 URL 字符串中输入,但可以从任何地方获取:
@app.route('/process/<string:data>')
def process(data):
job = q.enqueue(slow_func, data)
return redirect(url_for('result', id=job.id))
现在让我们在rq.Job 对象的帮助下处理实际结果。这里的逻辑可以调整,因为这将导致除"finished"之外的所有值刷新页面:
@app.route('/result/<string:id>')
def result(id):
job = Job.fetch(id, connection=r)
status = job.get_status()
if status in ['queued', 'started', 'deferred', 'failed']:
return get_template(status, refresh=True)
elif status == 'finished':
result = job.result
# If this is a string, we can simply return it:
return get_template(result)
如果状态为"finished",那么job.result将包含slow_func的返回值,所以我们在页面上呈现它。
此方法的缺点是在等待作业完成时会向服务器发出多个请求。元刷新标签可能有点不合常规。如果您从 Javascript 发送更新请求,那么有 solutions 可以间隔发送 AJAX 请求,尽管这会遇到相同的多请求问题。
另一种方法是使用 websockets 或 SSE 将已完成作业的结果在完成后立即流式传输到前端。
更新日期:2021 年 2 月 27 日
我决定尝试使用 SSE 方法更新前端的作业状态。我了解到rq 原生支持更新作业中的meta 属性,方法是在作业中导入rq.get_current_job,然后可以在作业刷新后从外部访问。
查看演示代码: