【问题标题】:flask application with background threads带有后台线程的烧瓶应用程序
【发布时间】:2014-05-02 04:02:10
【问题描述】:

我正在创建一个烧瓶应用程序,对于一个请求,我需要运行一些不需要在 UI 上等待的长时间运行的作业。我将创建一个线程并向 UI 发送一条消息。该线程将计算和更新数据库。但是,UI 会在提交时看到一条消息。 下面是我的实现,但它正在运行线程,然后将输出发送到我不喜欢的 UI。我怎样才能在后台运行这个线程?

@app.route('/someJob')
def index():
    t1 = threading.Thread(target=long_running_job)
    t1.start()
    return 'Scheduled a job'

def long_running_job
    #some long running processing here

如何让线程 t1 在后台运行并立即发送消息作为回报?

【问题讨论】:

    标签: python multithreading flask


    【解决方案1】:

    试试这个例子,在 Python 3.4.3 / Flask 0.11.1 上测试

    from flask import Flask
    from time import sleep
    from concurrent.futures import ThreadPoolExecutor
    
    # DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
    executor = ThreadPoolExecutor(2)
    
    app = Flask(__name__)
    
    
    @app.route('/jobs')
    def run_jobs():
        executor.submit(some_long_task1)
        executor.submit(some_long_task2, 'hello', 123)
        return 'Two jobs were launched in background!'
    
    
    def some_long_task1():
        print("Task #1 started!")
        sleep(10)
        print("Task #1 is done!")
    
    
    def some_long_task2(arg1, arg2):
        print("Task #2 started with args: %s %s!" % (arg1, arg2))
        sleep(5)
        print("Task #2 is done!")
    
    
    if __name__ == '__main__':
        app.run()
    

    【讨论】:

    • 是的,这适用于 python3,但我的问题是针对 2.7 的
    • 你可以使用除本地主机之外的任何网络服务器运行它,我的意思是 gunicorn 或 apache-wsgi
    • @arshpreet 我有几乎相同的代码在生产中工作,从 2016 年到现在通过 uWSGI 运行没有任何问题。使用 Gunicorn 进行测试也可以正常工作。
    • 如果你想在 Flask 中使用 concurrent.futures,请查看Flask-Executor。它提供了一种在 Flask 中初始化执行程序的更惯用的方式,并提供了一些方便的功能(保留应用上下文、应用程序工厂模式支持、装饰器)
    • 如果some_long_task1some_long_task2 引发异常怎么办?我写过类似的代码,但是发现异常被抑制了。
    【解决方案2】:

    查看Flask-Executor,它在后台使用 concurrent.futures,让您的生活变得非常轻松。

    from flask_executor import Executor
    
    executor = Executor(app)
    
    @app.route('/someJob')
    def index():
        executor.submit(long_running_job)
        return 'Scheduled a job'
    
    def long_running_job
        #some long running processing here
    

    这不仅可以在后台运行作业,还可以让它们访问应用程序上下文。它还提供了一种存储作业的方法,以便用户可以重新签入以获取状态。

    【讨论】:

    • 它必须放在应用上下文中?
    • 这很好用,我已经使用这个执行器stackoverflow.com/questions/68411571/… 完成了一些示例。但是,您能否指出一个可以同时运行多个后台任务的示例。
    【解决方案3】:

    最好的办法是使用消息代理。在 python 世界中有一些优秀的软件可以做到这一点:

    两者都是很好的选择。

    以您的方式生成线程几乎从来都不是一个好主意,因为这可能会导致处理传入请求等问题。

    如果您查看 celery 或 RQ 入门指南,他们会引导您以正确的方式完成此操作!

    【讨论】:

    • 是的,我知道 celery 和 redis 队列。但是,我正在尝试将它们作为简单的线程后台工作。不确定,如果我使用线程会弹出什么问题。你可以解释吗。另外,如果我想按照我的编码方式工作,我需要做些什么改变?根本不可能吗?
    • 我很想知道这会如何影响请求处理。
    • @San 我想这是因为 python 使用 GIL 并且 python 的线程不是经典意义上的线程。 即使是多线程应用程序在 python 中仍然是单线程的。 python 中的线程主要有助于 IO,但除此之外的任何东西实际上可能影响请求处理。而且python中的线程不是轻量级的,所以它们确实会占用大量资源。
    【解决方案4】:

    如果您想在 flask 应用程序上下文 中执行长时间运行的操作,那么它会更容易一些(与使用 ThreadPoolExecutor 相比,处理异常):

    1. 为您的应用程序定义一个命令行 (cli.py) - 因为无论如何所有 Web 应用程序都应该有一个管理员 cli
    2. subprocess.Popen(无需等待)Web 请求中的命令行。

    例如:

    # cli.py
    
    import click
    import yourpackage.app
    import yourpackage.domain
    
    app = yourpackage.app.create_app()
    
    @click.group()
    def cli():
        pass
    
    @click.command()
    @click.argument('foo_id')
    def do_something(foo_id):
        with app.app_context():
            yourpackage.domain.do_something(foo_id)
    
    if __name__ == '__main__':
        cli.add_command(do_something)
        cli()
    

    那么,

    # admin.py (flask view / controller)
    
    bp = Blueprint('admin', __name__, url_prefix='/admin')
    
    @bp.route('/do-something/<int:foo_id>', methods=["POST"])
    @roles_required('admin')
    def do_something(foo_id):
        yourpackage.domain.process_wrapper_do_something(foo_id)
        flash("Something has started.", "info")
        return redirect(url_for("..."))
    

    还有:

    # domain.py
    
    import subprocess
    
    def process_wrapper_do_something(foo_id):
        command = ["python3", "-m", "yourpackage.cli", "do_something", str(foo_id)]
        subprocess.Popen(command)
    
    def do_something(foo_id):
        print("I am doing something.")
        print("This takes some time.")
    

    【讨论】:

      【解决方案5】:

      同意@rdegges 的标记答案。抱歉,我的帐户没有足够的信用在答案下添加评论,但我想明确说明“为什么使用消息代理,而不是产生线程(或进程)”。

      关于 ThreadPoolExecutor 和 flask_executor 的其他答案是创建一个新线程(或进程,因为 flask_executor 能够)来执行“long_running_job”。这些新线程/进程将与主网站具有相同的上下文:

      对于线程:如果该线程引发异常,新线程将能够访问网站应用程序的上下文、更改或中断它; 对于进程:新进程将拥有网站应用程序上下文的副本。如果网站在初始化时不知何故使用了大量内存,那么新进程也会有一份它的副本,即使该进程不会使用这部分内存。

      另一方面,如果您使用消息代理,并且另一个应用程序检索作业消息以处理它,则新应用程序将与网站应用程序无关,也不会复制内存网络应用程序。

      将来,当你的应用程序足够大时,你可以将你的应用程序放到另一台(或多台服务器)中,很容易横向扩展。

      【讨论】:

        【解决方案6】:

        如果你想在烧瓶中使用 Celery。首先检查您的操作,并在操作完成时重定向您的 celery 任务。 如果您现在不使用 celery,可以查看此链接:FLASK: How to establish connection with mysql server?

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2015-07-04
          • 1970-01-01
          • 2014-11-17
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2022-01-02
          相关资源
          最近更新 更多