【问题标题】:How can I detect whether I'm running in a Celery worker?如何检测我是否在 Celery worker 中运行?
【发布时间】:2016-12-24 12:05:46
【问题描述】:

有没有办法以编程方式确定当前正在导入/运行的模块是在 celery worker 的上下文中完成的?

我们已经决定在运行 Celery worker 之前设置一个环境变量,并在代码中检查这个环境变量,但是不知道有没有更好的方法?

【问题讨论】:

  • celery.current_app 也许?

标签: python celery


【解决方案1】:

简单,

import sys

IN_CELERY_WORKER_PROCESS = sys.argv and sys.argv[0].endswith('celery')\
    and 'worker' in sys.argv

if IN_CELERY_WORKER_PROCESS:
    print ('Im in Celery worker')

http://percentl.com/blog/django-how-can-i-detect-whether-im-running-celery-worker/

【讨论】:

  • 现在我正在使用相同的解决方案,例如if 'worker' in sys.argv,但我担心如果另一个人导入了我的项目并且他还在他的芹菜项目中使用了参数'worker'。
【解决方案2】:

从 celery 4.2 开始,您还可以通过在 worker_ready 信号上设置一个标志来做到这一点

celery.py:


from celery.signals import worker_ready
app = Celery(...)
app.running = False

@worker_ready.connect
def set_running(*args, **kwargs):
    app.running = True


现在您可以使用全局应用实例检查您的任务 看看你是否在跑步。这对于确定使用哪个记录器非常有用。

【讨论】:

    【解决方案3】:

    您可以使用Celery 应用程序实例类中的current_worker_task 属性。 Docs here.

    定义了以下任务:

     # whatever_app/tasks.py
    
     celery_app = Celery(app)
    
     @celery_app.task
     def test_task():
         if celery_app.current_worker_task:
             return 'running in a celery worker'
         return 'just running'
    

    您可以在 python shell 上运行以下命令:

    In [1]: from whatever_app.tasks import test_task
    
    In [2]: test_task()
    Out[2]: 'just running'
    
    In [3]: r = test_task.delay()
    
    In [4]: r.result
    Out[4]: u'running in a celery worker' 
    

    注意:显然,要让test_task.delay() 成功,您需要至少有一个 celery worker 运行并配置为从whatever_app.tasks 加载任务。

    【讨论】:

    • 如果 Celery 正在对工作人员执行另一个任务,而这是离线执行的,这不会产生误报吗?
    【解决方案4】:

    使用名称启动工作人员是一种很好的做法,这样可以更轻松地管理(停止/杀死/重新启动)它们。您可以使用-n 来命名工作人员。

    celery worker -l info -A test -n foo
    

    现在,在您的脚本中,您可以使用app.control.inspect 来查看该工作器是否正在运行。

    In [22]: import test
    
    In [23]: i = test.app.control.inspect(['foo'])
    
    In [24]: i.app.control.ping()
    Out[24]: [{'celery@foo': {'ok': 'pong'}}]
    

    您可以阅读更多about this in celery worker docs

    【讨论】:

      【解决方案5】:

      根据您的用例场景的具体情况,您可以通过检查是否设置了请求 ID 来检测它:

      @app.task(bind=True)
      def foo(self):
          print self.request.id
      

      如果您以foo.delay() 调用上述内容,则任务将发送给工作人员,self.request.id 将设置为唯一编号。如果你以foo() 调用它,那么它将在你当前的进程中执行,self.request.id 将是None

      【讨论】:

        【解决方案6】:

        添加环境变量是检查模块是否由 celery worker 运行的好方法。在任务提交进程中,我们可以设置环境变量,以标记它没有在 celery worker 的上下文中运行。

        但更好的方法可能是使用一些 celery 信号,这可能有助于了解模块是在工作人员还是任务提交者中运行。例如,worker-process-init 信号被发送到每个子任务执行器进程(在预分叉模式下),并且处理程序可用于设置一些全局变量,指示它是一个工作进程。

        【讨论】:

        • 不幸的是,我需要知道我是否工作得更早(当我导入我的任务时......)。所以 envvar 是我前进的唯一途径......
        猜你喜欢
        • 2017-01-08
        • 1970-01-01
        • 2012-01-20
        • 2017-11-14
        • 2014-05-17
        • 1970-01-01
        • 2016-11-28
        • 2010-10-19
        • 1970-01-01
        相关资源
        最近更新 更多