我建议使用抽象任务基类并缓存requests.session。
来自 Celery 文档:
任务不是针对每个请求都实例化,而是在任务注册表中注册为全局实例。
这意味着每个进程只会调用一次 __init__ 构造函数,并且任务类在语义上更接近于 Actor。
这对于缓存资源也很有用...
import requests
from celery import Task
class APITask(Task):
"""API requests task class."""
abstract = True
# the cached requests.session object
_session = None
def __init__(self):
# since this class is instantiated once, use this method
# to initialize and cache resources like a requests.session
# or use a property like the example below which will create
# a requests.session only the first time it's accessed
@property
def session(self):
if self._session is None:
# store the session object for the first time
session = requests.Session()
session.auth = ('user', 'pass')
self._session = session
return self._session
现在,当您创建将发出 API 请求的任务时:
@app.task(base=APITask, bind=True)
def call_api(self, url):
# self will refer to the task instance (because we're using bind=True)
self.session.get(url)
您还可以使用 app.task 装饰器作为额外参数传递 API 身份验证选项,该参数将设置在任务的 __dict__ 上,例如:
# pass a custom auth argument
@app.task(base=APITask, bind=True, auth=('user', 'pass'))
def call_api(self, url):
pass
并使基类使用传递的身份验证选项:
class APITask(Task):
"""API requests task class."""
abstract = True
# the cached requests.session object
_session = None
# the API authentication
auth = ()
@property
def session(self):
if self._session is None:
# store the session object for the first time
session = requests.Session()
# use the authentication that was passed to the task
session.auth = self.auth
self._session = session
return self._session
您可以在 Celery 文档网站上阅读更多内容:
现在回到你原来的问题,即从命令行向工作人员传递额外的参数:
在 Celery 文档 Adding new command-line options 中有一个关于此的部分,这是从命令行向工作人员传递用户名和密码的示例:
$ celery worker -A appname --username user --password pass
代码:
from celery import bootsteps
from celery.bin import Option
app.user_options['worker'].add(
Option('--username', dest='api_username', default=None, help='API username.')
)
app.user_options['worker'].add(
Option('--password', dest='api_password', default=None, help='API password.')
)
class CustomArgs(bootsteps.Step):
def __init__(self, worker, api_username, api_password, **options):
# store the api authentication
APITask.auth = (api_username, api_password)
app.steps['worker'].add(CustomArgs)