【问题标题】:How to use python to schedule tasks in a Django application如何使用 python 在 Django 应用程序中安排任务
【发布时间】:2020-10-12 22:56:52
【问题描述】:

我是 Django 和一般网络框架的新手。我有一个应用程序已全部设置好并且在我的本地主机上运行良好。

该程序使用 Twitter 的 API 来收集一堆推文并将它们显示给用户。唯一的问题是我需要我的 python 程序让推文经常在后台运行。

这是使用调度模块有意义的地方,但是一旦我启动本地服务器,它就永远不会运行调度功能。我尝试阅读 cronjobs,但似乎无法让它工作。如何让 Django 定期运行特定的 python 文件?

【问题讨论】:

    标签: python django scheduled-tasks


    【解决方案1】:

    您可以使用的另一个库是django-q

    Django Q 是使用 Python 多处理的原生 Django 任务队列、调度程序和工作应用程序。 1

    django-appscheduler 一样,它可以使用 Django 附加到的数据库来运行和跟踪作业。或者,它可以使用像 Reddis 这样成熟的代理。

    唯一的问题是我需要我的 python 程序让推文经常在后台运行。

    这听起来像一个调度程序。 (Django-q 也有一个任务特性,它可以由事件触发,而不是按计划运行。调度器只是位于任务特性之上,并按定义的计划触发任务。)

    django-q 分为三个部分:

    1. 安装Django-q并进行配置;
    2. 定义要获取推文的任务函数(或函数集);
    3. 定义运行任务的计划;
    4. 运行将处理计划和任务的 django-q 集群。

    安装 django-q

    pip install django-q
    

    在 Django settings.py 中将其配置为已安装的应用程序(将其添加到安装应用程序列表中):

    INSTALLED_APPS = [
        ...
        'django_q',
        ...
    ]
    

    然后它需要它自己的配置settings.py(这是使用数据库作为代理而不是reddis或Django外部的配置。)

    # Settings for Django-Q
    # https://mattsegal.dev/simple-scheduled-tasks.html
    
    Q_CLUSTER = {
        'orm': 'default',  # should use django's ORM and database as a broker.
        'workers': 4,
        'timeout': 30,
        'retry': 60,
        'queue_limit': 50,
        'bulk': 10,
    }
    

    然后您需要在数据库上运行迁移以创建 django-q 使用的表:

    python manage.py migrate
    

    (这将在数据库中创建一堆与计划和任务相关的表。可以通过 Django 管理面板查看和操作它们。)

    定义任务函数

    然后为您要运行的tasks 创建一个新文件:

    # app/tasks.py
    def fetch_tweets():
        pass  # do whatever logic you want here
    

    定义任务计划

    我们需要将schedule 添加到数据库中以运行任务。

    python manage.py shell
    from django_q.models import Schedule
    Schedule.objects.create(
        func='app.tasks.fetch_tweets',  # module and func to run
        minutes=5,  # run every 5 minutes
        repeats=-1  # keep repeating, repeat forever
    )
    

    您不必通过 shell 执行此操作。您可以在 python 代码等模块中执行此操作。但您可能只需要创建一次计划。

    运行集群

    完成后,您需要运行将处理计划的集群。否则,在不运行集群的情况下,将永远不会处理计划和任务。对 qcluster 的调用是阻塞调用。因此,通常您希望在与 Django 服务器进程不同的窗口或进程中运行它。

    python manage.py qcluster
    

    当它运行时,你会看到如下输出:

    09:33:00 [Q] INFO Q Cluster fruit-november-wisconsin-hawaii starting.
    09:33:00 [Q] INFO Process-1:1 ready for work at 11
    09:33:00 [Q] INFO Process-1:2 ready for work at 12
    09:33:00 [Q] INFO Process-1:3 ready for work at 13
    09:33:00 [Q] INFO Process-1:4 ready for work at 14
    09:33:00 [Q] INFO Process-1:5 monitoring at 15
    09:33:00 [Q] INFO Process-1 guarding cluster fruit-november-wisconsin-hawaii
    09:33:00 [Q] INFO Q Cluster fruit-november-wisconsin-hawaii running.
    

    如果您想了解如何将任务与报告、电子邮件或信号等联系起来,还有一些 example documentation 非常有用。

    【讨论】:

      【解决方案2】:

      我遇到过类似的情况,并且在django-apscheduler 上取得了很大的成功。它都是独立的——它与 Django 服务器一起运行,并且在 Django 数据库中跟踪作业,因此您无需配置任何外部 cron 作业或任何调用脚本的东西。

      以下是快速启动和运行的基本方法,但本文末尾的链接包含更多文档和详细信息以及更高级的选项。

      使用pip install django-apscheduler 安装,然后将其添加到您的INSTALLED_APPS

      INSTALLED_APPS = [
          ...
          'django_apscheduler',
          ...
      ]
      

      安装后,确保在数据库上运行 makemigrationsmigrate

      创建一个scheduler python 包(在您的应用程序目录中名为scheduler 的文件夹,其中包含一个空白__init__.py)。然后,在其中创建一个名为 scheduler.py 的文件,该文件应如下所示:

      from apscheduler.schedulers.background import BackgroundScheduler
      from django_apscheduler.jobstores import DjangoJobStore, register_events
      from django.utils import timezone
      from django_apscheduler.models import DjangoJobExecution
      import sys
      
      # This is the function you want to schedule - add as many as you want and then register them in the start() function below
      def deactivate_expired_accounts():
          today = timezone.now()
          ...
          # get accounts, expire them, etc.
          ...
      
      
      def start():
          scheduler = BackgroundScheduler()
          scheduler.add_jobstore(DjangoJobStore(), "default")
          # run this job every 24 hours
          scheduler.add_job(deactivate_expired_accounts, 'interval', hours=24, name='clean_accounts', jobstore='default')
          register_events(scheduler)
          scheduler.start()
          print("Scheduler started...", file=sys.stdout)
      

      在您的 apps.py 文件中(如果不存在则创建它):

      from django.apps import AppConfig
      
          class AppNameConfig(AppConfig):
              name = 'your_app_name'
              def ready(self):
                  from scheduler import scheduler
                  scheduler.start()
      

      请注意:在settings.py 文件中使用DEBUG = True 时,请在设置--noreload 标志(即python manage.py runserver localhost:8000 --noreload)的情况下运行开发服务器,否则计划任务将启动并运行两次。

      另外,django-apscheduler 不允许您将任何参数传递给计划运行的函数。这是一个限制,但我从来没有遇到过问题。如果你真的需要,你可以从一些外部源加载它们,比如 Django 数据库。

      您可以在 apscheduler 任务(函数)中使用所有标准的 Django 库、包和函数。例如,查询模型、调用外部 API、解析响应/数据等。它是无缝集成的。

      一些额外的链接:

      【讨论】:

      • 我建议在 add_job 调用中添加一个 id="some_id" 以防止每次启动都重复作业
      • 除了 Michael 的警告:您也可以只检查 ready() 方法中的 RUN_MAIN env var,在第二次迭代中它等于 True。这种干净简单的解决方法允许您保持开发服务器重新加载。
      • 另外不要忘记在应用的__init__.py:default_app_config = 'myApp.apps.myAppConfig'中设置默认应用配置
      • register_events 已被弃用,并将在未来的版本中删除。不再需要调用此方法,因为DjangoJobStore 将在调度程序启动时自动注册它关心的事件。
      • @vpap 好问题!有一天我花了一些时间才明白那里发生了什么。答案在这里django/utils/autoreload.py。最初没有RUN_MAIN 环境变量,代码中只有DJANGO_AUTORELOAD_ENV = 'RUN_MAIN'。因此,在run_with_reloader 中,程序采用else 代码块,而在restart_with_reloader 中,环境变量设置为new_environ = {**os.environ, DJANGO_AUTORELOAD_ENV: 'true'}
      猜你喜欢
      • 1970-01-01
      • 2011-01-30
      • 2015-07-27
      • 2012-01-23
      • 2020-03-02
      • 1970-01-01
      • 1970-01-01
      • 2012-05-02
      • 1970-01-01
      相关资源
      最近更新 更多