【问题标题】:Background tasks in flask烧瓶中的后台任务
【发布时间】:2016-12-06 07:07:01
【问题描述】:

我正在编写一个 Web 应用程序,它会做一些繁重的工作。考虑到这一点,我想将这些任务设为后台任务(非阻塞),这样其他请求就不会被之前的请求阻塞。

我将线程妖魔化,以便在主线程(因为我使用threaded=True)完成后它不会退出,现在如果用户发送请求,我的代码将立即告诉他们他们的请求在进度,它将在后台运行,并且应用程序已准备好处理其他请求。

我当前的应用程序代码如下所示:

from flask import Flask
from flask import request
import threading

class threadClass:

    def __init__(self):
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True                       # Daemonize thread
        thread.start()                             # Start the execution

    def run(self):

         #
         # This might take several minutes to complete
         someHeavyFunction()

app = Flask(__name__)

@app.route('/start', methods=['POST'])
    try:
        begin = threadClass()
    except:
        abort(500)

    return "Task is in progress"

def main():
    """
    Main entry point into program execution

    PARAMETERS: none
    """
    app.run(host='0.0.0.0',threaded=True)

main()

我只是希望它能够处理一些并发请求(它不会在生产中使用)

我可以做得更好吗?我错过了什么吗?我正在浏览python的多线程包,发现了这个

multiprocessing 是一个支持使用 API 类似于 threading 模块。多处理包 提供本地和远程并发,有效地回避 通过使用子进程而不是线程来全局解释器锁。 因此,多处理模块允许程序员完全 利用给定机器上的多个处理器。它在两个 Unix 上运行 和窗户。

我可以使用多处理来妖魔化进程吗?我怎样才能比使用 threading 模块做得更好?

##编辑

我通过python的多处理包,它类似于线程。

from flask import Flask
from flask import request
from multiprocessing import Process

class processClass:

    def __init__(self):
        p = Process(target=self.run, args=())
        p.daemon = True                       # Daemonize it
        p.start()                             # Start the execution

    def run(self):

         #
         # This might take several minutes to complete
         someHeavyFunction()

app = Flask(__name__)

@app.route('/start', methods=['POST'])
    try:
        begin = processClass()
    except:
        abort(500)

    return "Task is in progress"

def main():
    """
    Main entry point into program execution

    PARAMETERS: none
    """
    app.run(host='0.0.0.0',threaded=True)

main()

上面的方法好看吗?

【问题讨论】:

标签: python multithreading flask


【解决方案1】:

最佳实践

在烧瓶中实现后台任务的最佳方法是使用 Celery,如 this SO post 中所述。一个好的起点是官方的Flask documentationCelery documentation

疯狂方式:构建自己的装饰器

正如@MrLeeh 在评论中指出的那样,Miguel Grinberg 在他的Pycon 2016 talk 中通过实现装饰器提出了解决方案。我想强调的是,我对他的解决方案怀有最高的敬意;他自己称其为“疯狂的解决方案”。下面的代码是对his solution的小改编。

警告!!!

不要在生产环境中使用它!主要原因是这个应用程序使用全局tasks 字典存在内存泄漏。即使您解决了内存泄漏问题,维护这种代码也很困难。如果您只是想在私人项目中玩耍或使用它,请继续阅读。

小例子

假设您在 /foo 端点中有一个长时间运行的函数调用。我用 10 秒 sleep 计时器来模拟这个。如果调用 enpoint 3 次,则需要 30 秒才能完成。

Miguel Grinbergs 装饰器解决方案在 flask_async 中实现。它在与当前 Flask 上下文相同的 Flask 上下文中运行一个新线程。每个线程都会发出一个新的task_id。结果保存在全局字典tasks[task_id]['result']中。

使用装饰器后,您只需使用@flask_async 装饰端点,并且端点是异步的——就像那样!

import threading
import time
import uuid
from functools import wraps

from flask import Flask, current_app, request, abort
from werkzeug.exceptions import HTTPException, InternalServerError

app = Flask(__name__)
tasks = {}


def flask_async(f):
    """
    This decorator transforms a sync route to asynchronous by running it in a background thread.
    """
    @wraps(f)
    def wrapped(*args, **kwargs):
        def task(app, environ):
            # Create a request context similar to that of the original request
            with app.request_context(environ):
                try:
                    # Run the route function and record the response
                    tasks[task_id]['result'] = f(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['result'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['result'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task': threading.Thread(
            target=task, args=(current_app._get_current_object(), request.environ))}
        tasks[task_id]['task'].start()

        # Return a 202 response, with an id that the client can use to obtain task status
        return {'TaskId': task_id}, 202

    return wrapped


@app.route('/foo')
@flask_async
def foo():
    time.sleep(10)
    return {'Result': True}


@app.route('/foo/<task_id>', methods=['GET'])
def foo_results(task_id):
    """
        Return results of asynchronous task.
        If this request returns a 202 status code, it means that task hasn't finished yet.
        """
    task = tasks.get(task_id)
    if task is None:
        abort(404)
    if 'result' not in task:
        return {'TaskID': task_id}, 202
    return task['result']


if __name__ == '__main__':
    app.run(debug=True)

但是,您需要一些小技巧才能获得结果。端点/foo 只会返回 HTTP 代码 202 和任务 id,而不是结果。您需要另一个端点 /foo/&lt;task_id&gt; 来获得结果。以下是 localhost 的示例:

import time
import requests

task_ids = [requests.get('http://127.0.0.1:5000/foo').json().get('TaskId')
            for _ in range(2)]
time.sleep(11)
results = [requests.get(f'http://127.0.0.1:5000/foo/{task_id}').json()
           for task_id in task_ids]
# [{'Result': True}, {'Result': True}]

【讨论】:

    猜你喜欢
    • 2022-01-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-16
    • 2020-03-16
    相关资源
    最近更新 更多