django+celery项目结构

- project_name
      - app01
            - __init__.py
            - admin.py
            - views.py
            - modes.py
            - tasks.py             #celery用来执行任务的文件,task里的任务由views函数里去触发
            - urls.py            
            - views.py
      - project_name
            - __init__.py      #初始化celery
            - celery.py            #celery 定义实例
            - settings.py           #用来配置redis或rabbitmq地址
            - urls.py
            - views.py
            - wsgi.py
      - templates
      - static
      - manager.py
      - db.sqlite3    

 

celery.py

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings')    #这里要写项目名称

app = Celery('project_name')

app.config_from_object('django.conf:settings', namespace='CELERY')   #这里配置settings里与celery相关配置的前缀

# Load task modules from all registered Django app configs.

app.autodiscover_tasks()

@app.task(bind=True)

def debug_task(self):

    print('Request: {0!r}'.format(self.request))

 

project_name/__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when

# Django starts so that shared_task will use this app.

from .celery import app as celery_app

__all__ = ['celery_app']

 

tasks.py

#!/usr/bin/env python
#-*-coding:utf-8-*-
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import subprocess

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def cmd_run(cmd):
    result = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    return result.stdout.read().decode("utf-8")

 

app01/views.py

from django.shortcuts import render,redirect,HttpResponse
from django_celery import tasks    #引入task
from celery.result import AsyncResult
 def test_celery(request):
  #这里用来触发tasks里的任务 res
= tasks.cmd_run.delay( "ipconfig", ) #print (res.get)  #如果在此处直接get会变成同步 return HttpResponse(res.task_id)    #获取taskid

#获取任务执行状态返回给前端
def task_res(request):
  result = AsyncResult(id=task_id)
  return HttpResponse(result.status)

 

app01/urls.py

from django.conf.urls import url,include
from django.contrib import admin
from django_celery import views

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^test_celery/$',views.test_celery)
]

 

启动worker

 

celery -A wecaht worker -l info -P eventlet

 

django&celery 定时任务

pip3 install django-celery-beat

##settings.py 里注册 django-celery-beat
INSTALLED_APPS = [
    'django_celery_beat',
]

python manage.py migrate

D:\django-project\wechat>python manage.py migrate
Operations to perform:
  Apply all migrations: auth, sessions, django_celery_beat, contenttypes, admin
Running migrations:
  Rendering model states... DONE
  Applying contenttypes.0001_initial... OK
  Applying auth.0001_initial... OK
  Applying admin.0001_initial... OK
  Applying admin.0002_logentry_remove_auto_add... OK
  Applying contenttypes.0002_remove_content_type_name... OK
  Applying auth.0002_alter_permission_name_max_length... OK
  Applying auth.0003_alter_user_email_max_length... OK
  Applying auth.0004_alter_user_username_opts... OK
  Applying auth.0005_alter_user_last_login_null... OK
  Applying auth.0006_require_contenttypes_0002... OK
  Applying auth.0007_alter_validators_add_error_messages... OK
  Applying django_celery_beat.0001_initial... OK
  Applying django_celery_beat.0002_auto_20161118_0346... OK
  Applying django_celery_beat.0003_auto_20161209_0049... OK
  Applying django_celery_beat.0004_auto_20170221_0000... OK
  Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
  Applying django_celery_beat.0006_auto_20180210_1226... OK
  Applying sessions.0001_initial... OK

 

登录后台

django和celery结合应用

 

启动celery beat

celery -A project_name beat -l info -S django

 

每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到

分类:

技术点:

相关文章: