【发布时间】:2016-12-24 12:05:46
【问题描述】:
有没有办法以编程方式确定当前正在导入/运行的模块是在 celery worker 的上下文中完成的?
我们已经决定在运行 Celery worker 之前设置一个环境变量,并在代码中检查这个环境变量,但是不知道有没有更好的方法?
【问题讨论】:
-
celery.current_app也许?
有没有办法以编程方式确定当前正在导入/运行的模块是在 celery worker 的上下文中完成的?
我们已经决定在运行 Celery worker 之前设置一个环境变量,并在代码中检查这个环境变量,但是不知道有没有更好的方法?
【问题讨论】:
celery.current_app 也许?
简单,
import sys
IN_CELERY_WORKER_PROCESS = sys.argv and sys.argv[0].endswith('celery')\
and 'worker' in sys.argv
if IN_CELERY_WORKER_PROCESS:
print ('Im in Celery worker')
http://percentl.com/blog/django-how-can-i-detect-whether-im-running-celery-worker/
【讨论】:
if 'worker' in sys.argv,但我担心如果另一个人导入了我的项目并且他还在他的芹菜项目中使用了参数'worker'。
从 celery 4.2 开始,您还可以通过在 worker_ready 信号上设置一个标志来做到这一点
在celery.py:
from celery.signals import worker_ready
app = Celery(...)
app.running = False
@worker_ready.connect
def set_running(*args, **kwargs):
app.running = True
现在您可以使用全局应用实例检查您的任务 看看你是否在跑步。这对于确定使用哪个记录器非常有用。
【讨论】:
您可以使用Celery 应用程序实例类中的current_worker_task 属性。 Docs here.
定义了以下任务:
# whatever_app/tasks.py
celery_app = Celery(app)
@celery_app.task
def test_task():
if celery_app.current_worker_task:
return 'running in a celery worker'
return 'just running'
您可以在 python shell 上运行以下命令:
In [1]: from whatever_app.tasks import test_task
In [2]: test_task()
Out[2]: 'just running'
In [3]: r = test_task.delay()
In [4]: r.result
Out[4]: u'running in a celery worker'
注意:显然,要让test_task.delay() 成功,您需要至少有一个 celery worker 运行并配置为从whatever_app.tasks 加载任务。
【讨论】:
使用名称启动工作人员是一种很好的做法,这样可以更轻松地管理(停止/杀死/重新启动)它们。您可以使用-n 来命名工作人员。
celery worker -l info -A test -n foo
现在,在您的脚本中,您可以使用app.control.inspect 来查看该工作器是否正在运行。
In [22]: import test
In [23]: i = test.app.control.inspect(['foo'])
In [24]: i.app.control.ping()
Out[24]: [{'celery@foo': {'ok': 'pong'}}]
【讨论】:
根据您的用例场景的具体情况,您可以通过检查是否设置了请求 ID 来检测它:
@app.task(bind=True)
def foo(self):
print self.request.id
如果您以foo.delay() 调用上述内容,则任务将发送给工作人员,self.request.id 将设置为唯一编号。如果你以foo() 调用它,那么它将在你当前的进程中执行,self.request.id 将是None。
【讨论】:
添加环境变量是检查模块是否由 celery worker 运行的好方法。在任务提交进程中,我们可以设置环境变量,以标记它没有在 celery worker 的上下文中运行。
但更好的方法可能是使用一些 celery 信号,这可能有助于了解模块是在工作人员还是任务提交者中运行。例如,worker-process-init 信号被发送到每个子任务执行器进程(在预分叉模式下),并且处理程序可用于设置一些全局变量,指示它是一个工作进程。
【讨论】: