【问题标题】:Is reading a global collections.deque from within a Flask request safe?从 Flask 请求中读取全局 collections.deque 是否安全?
【发布时间】:2016-06-14 14:17:11
【问题描述】:

我有一个 Flask 应用程序,它应该在指定路线上向用户显示长时间运行的函数的结果。结果每隔一小时左右就会改变一次。为了避免用户不得不等待结果,我想让它缓存在应用程序的某个地方,并在后台以特定的时间间隔(例如每小时)重新计算它,这样用户就不必等待长时间运行的计算函数。

我想出的解决这个问题的想法如下,但是,我不完全确定在具有多线程甚至多处理网络服务器(例如@)的生产环境中这样做是否真的“安全” 987654323@、eventletgunicorn 或其他。

为了在后台重新计算结果,我使用来自APScheduler libraryBackgroundScheduler

然后将结果附加在collections.deque 对象中,该对象注册为模块范围的变量(因为据我所知,没有更好的可能性在 Flask 应用程序中保存应用程序范围的全局变量?!)。由于双端队列的最大大小设置为 2,所以旧的结果会随着新的进来而出现在双端队列的右侧。

Flask 视图现在将deque[0] 返回给请求者,它应该始终是最新的结果。我决定使用deque 而不是Queue,因为后者没有内置的可能性来读取第一项而不删除它。

因此,可以保证没有用户需要等待结果,因为旧的只有在新的进入的那一刻才会从“缓存”中消失。

请参阅下面的一个最小示例。运行脚本并点击http://localhost:5000 时,可以看到缓存在起作用——“作业完成时间”不应晚于 10 秒,再加上在“当前时间”之后重新计算它的非常短的时间,仍然不应该必须从作业函数等待time.sleep(5) 秒,直到请求返回。

对于给定的需求,这是一个有效的实现,也可以在生产就绪的 WSGI 服务器设置中工作,还是应该以不同的方式完成?

from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import time
import datetime
from collections import deque

# a global deque that is filled by APScheduler and read by a Flask view
deque = deque(maxlen=2)

# a function filling the deque that is executed in regular intervals by APScheduler
def some_long_running_job():
    print('complicated long running job started...')
    time.sleep(5)
    job_finished_at = datetime.datetime.now()
    deque.appendleft(job_finished_at)

# a function setting up the scheduler
def start_scheduler():
    scheduler = BackgroundScheduler()
    scheduler.add_job(some_long_running_job,
                      trigger='interval',
                      seconds=10,
                      next_run_time=datetime.datetime.utcnow(),
                      id='1',
                      name='Some Job name'
                      )
    scheduler.start()

# a flask application
app = Flask(__name__)

# a flask route returning an item from the global deque
@app.route('/')
def display_job_result():
    current_time = datetime.datetime.now()
    job_finished_at = deque[0]

    return '''
        Current time is: {0} <br>
        Job finished at: {1}
        '''.format(current_time, job_finished_at)

# start the scheduler and flask server
if __name__ == '__main__':
    start_scheduler()
    app.run()

【问题讨论】:

  • 不,它不是线程/进程安全的。使用 redis 或类似的东西。
  • 可以用app.run(thread=True)运行开发服务器看看。

标签: python flask thread-safety deque apscheduler


【解决方案1】:

如果您运行多个进程,线程安全是不够的:

尽管collections.deque 是线程安全的:

双端队列支持线程安全、内存高效的从双端队列的任一侧追加和弹出,在任一方向上的 O(1) 性能大致相同。

来源:https://docs.python.org/3/library/collections.html#collections.deque

根据您的配置,您的网络服务器可能会在多个进程中运行多个工作器,因此每个进程都有自己的对象实例。


即使只有一名工作人员,线程安全也可能不够:

您可能选择了异步工作器类型。异步工作器不知道何时可以安全地让出,并且您的代码必须受到保护以防止出现这样的情况:

  1. 请求 1 的工作器读取值 a 并产生
  2. 请求 2 的工作器还读取值 a,写入 a + 1 并产生
  3. 请求 1 的工作器写入值 a + 1,即使它应该是 a + 1 + 1

可能的解决方案:

使用 Flask 应用程序之外的东西来存储数据。这可以是一个数据库,在这种情况下最好是像 Redis 这样的内存数据库。或者,如果您的 worker 类型与 multiprocessing 模块兼容,您可以尝试使用 multiprocessing.managers.BaseManager 将您的 Python 对象提供给所有 worker 进程。

【讨论】:

    猜你喜欢
    • 2014-07-06
    • 1970-01-01
    • 2014-03-27
    • 1970-01-01
    • 2018-10-16
    • 2021-09-23
    • 2015-12-25
    相关资源
    最近更新 更多