【问题标题】:Initializing a worker with arguments using Celery使用 Celery 初始化带有参数的 worker
【发布时间】:2015-01-20 03:18:01
【问题描述】:

我在寻找对我来说似乎相对简单的东西时遇到了问题。

我将 Celery 3.1 与 Python 3 一起使用,我想用参数初始化我的工作人员,以便他们可以使用这些详细信息进行设置。

具体而言:这些工作人员将执行需要使用身份验证凭据与第三方 API 交互的任务。在使用任何任务之前,worker 必须将身份验证详细信息传递给 API 服务器(身份验证详细信息在第一次身份验证请求后存储在 cookie 中)。

我想在从 CLI 启动工作人员时将这些登录凭据传递给工作人员。然后,我希望工作人员使用它们进行身份验证并存储会话以供使用未来任务时使用(理想情况下,这将存储在可以从任务访问的属性中)。

Celery 可以做到这一点吗?

附带说明一下,我考虑过将 requests.session 对象(来自 Python requests 库)作为任务参数传递,但这需要进行序列化,这看起来不受欢迎。

【问题讨论】:

    标签: python celery


    【解决方案1】:

    我建议使用抽象任务基类并缓存requests.session

    来自 Celery 文档:

    任务不是针对每个请求都实例化,而是在任务注册表中注册为全局实例。

    这意味着每个进程只会调用一次 __init__ 构造函数,并且任务类在语义上更接近于 Actor。

    这对于缓存资源也很有用...

    import requests
    from celery import Task
    
    class APITask(Task):
        """API requests task class."""
    
        abstract = True
    
        # the cached requests.session object
        _session = None
    
        def __init__(self):
            # since this class is instantiated once, use this method
            # to initialize and cache resources like a requests.session
            # or use a property like the example below which will create
            # a requests.session only the first time it's accessed
    
        @property
        def session(self):
            if self._session is None:
                # store the session object for the first time
                session = requests.Session()
                session.auth = ('user', 'pass')
    
                self._session = session
    
            return self._session
    

    现在,当您创建将发出 API 请求的任务时:

    @app.task(base=APITask, bind=True)
    def call_api(self, url):
        # self will refer to the task instance (because we're using bind=True)
        self.session.get(url)
    

    您还可以使用 app.task 装饰器作为额外参数传递 API 身份验证选项,该参数将设置在任务的 __dict__ 上,例如:

    # pass a custom auth argument
    @app.task(base=APITask, bind=True, auth=('user', 'pass'))
    def call_api(self, url):
        pass
    

    并使基类使用传递的身份验证选项:

    class APITask(Task):
        """API requests task class."""
    
        abstract = True
    
        # the cached requests.session object
        _session = None
    
       # the API authentication
       auth = ()
    
        @property
        def session(self):
            if self._session is None:
                # store the session object for the first time
                session = requests.Session()
                # use the authentication that was passed to the task
                session.auth = self.auth
    
                self._session = session
    
            return self._session
    

    您可以在 Celery 文档网站上阅读更多内容:

    现在回到你原来的问题,即从命令行向工作人员传递额外的参数:

    在 Celery 文档 Adding new command-line options 中有一个关于此的部分,这是从命令行向工作人员传递用户名和密码的示例:

    $ celery worker -A appname --username user --password pass
    

    代码:

    from celery import bootsteps
    from celery.bin import Option
    
    
    app.user_options['worker'].add(
        Option('--username', dest='api_username', default=None, help='API username.')
    )
    
    app.user_options['worker'].add(
        Option('--password', dest='api_password', default=None, help='API password.')
    )
    
    
    class CustomArgs(bootsteps.Step):
    
        def __init__(self, worker, api_username, api_password, **options):
            # store the api authentication
            APITask.auth = (api_username, api_password)
    
    
    app.steps['worker'].add(CustomArgs)
    

    【讨论】:

    • 太好了,我很难从文档中破译所有这些内容。谢谢你把它布置得这么好。
    • 抱歉再次挖掘这个问题,您能否阐明如何将命令行参数从 Boostep 传递给任务初始化(以便我可以使用从命令行提供的用户名和密码)。目标是不以纯文本形式存储我的 API 凭据。
    • @JoshuaGilman 抱歉耽搁了,我用一个例子更新了答案。
    • @Pierre 我有与这个问题类似的要求,我想将一些额外的属性附加到 wroker 而不是 Task,这里是堆栈溢出问题的链接stackoverflow.com/questions/42834227/…
    【解决方案2】:

    我认为您可以使用命令行参数调用您编写的脚本。类似于以下内容:

    my_script.py username password
    

    在您的脚本中,您可以将 main 函数包装在 @celery.task@app.task 装饰器中。

    import sys
    
    from celery import Celery
    
    cel = Celery() # put whatever config info you need in here
    
    @celery.task
    def main():
        username, password = sys.argv[1], sys.argv[2]
    

    类似的东西应该让你开始。请务必查看 Python 的 argparse 以获得更复杂的参数解析。

    【讨论】:

    • 谢谢,但是您不能通过调用 python 脚本来启动工作进程。您必须像这样调用 celery:celery -A proj worker -l info
    • 那么我们这里肯定有一个非常奇怪的设置......因为它看起来就是这样工作的。我得再研究一下我们在做什么。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-02-25
    • 2012-05-22
    • 2013-10-02
    • 2023-01-15
    • 1970-01-01
    • 1970-01-01
    • 2020-06-26
    相关资源
    最近更新 更多