【问题标题】:Pass additional arguments to the callback function in Celery chord将附加参数传递给 Celery chord 中的回调函数
【发布时间】:2017-12-06 15:08:31
【问题描述】:

我需要在 celery chords 中将其他参数传递给我的回调函数。 (Celery 版本:4.1.0(潜在调用)和 Python 2.7

考虑以下示例:

program.py

from tasks import get_stock_info, call_back
from celery import group, chord

def chord_queue():
    header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
    callback = call_back.subtask()
    header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
    res = chord(header,queue='susanoo_dev')(callback)
    res1 = chord(header1,queue='susanoo_core')(callback)
    print(res.get())
    print(res1.get())
    print("We are done")

if __name__ == '__main__':
    chord_queue()

tasks.py

from pandas_datareader import data
from celery_app import app
import time

@app.task
def get_stock_info(delay):
    print('hello Celery--------')
    time.sleep(delay)
    print('Whats up')
    return 10

@app.task
def call_back(num):
    print("Everything is done------")
    print("Everything is done------")
    return sum(num)

celery_app.py

from celery import Celery
from kombu import Queue

app = Celery('tasks', broker='amqp://my_user:my_pass@localhost/my_vhost', backend='redis://localhost:6379/0')

CELERY_CONFIG = {
    'CELERY_DEFAULT_QUEUE': 'default',
    'CELERY_QUEUES': (Queue('dev'), Queue('core'),)
}

app.conf.update(**CELERY_CONFIG)

现在在这种情况下,当 chord 被调用并且所有 3 个 get_stock_info 任务完成后,call_back 被调用,其值 10 ,即 get_stock_info 的返回值会自动传递。 现在,除了返回值之外,我还想将一个附加参数说一个字符串作为“abcd”传递给回调函数。

我该怎么做?

我已经按照一些博客/SO 答案等的建议尝试过这样做。

program.py

def chord_queue():
    header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
    callback = call_back.subtask(kwargs={'my_str' : 'abcd'})
    header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
    res = chord(header,queue='susanoo_dev' )(callback)
    res1 = chord(header1,queue='susanoo_core')(callback)
    print(res.get())
    print(res1.get())
    print("We are done")

tasks.py

@app.task
def call_back(num, my_str):
    print("Everything is done------")
    print("Everything is done------")
    print my_str
    return my_str, sum(num)

但这似乎不起作用并引发以下错误:

celery.backends.base.ChordError: 回调错误: TypeError("call_back() got an unexpected keyword argument 'my_str'",)

【问题讨论】:

    标签: python python-2.7 callback celery chord


    【解决方案1】:

    得到了答案。感谢一位帮助我解决问题的朋友。 上述解决方案中的所有错误都是在 call_back() 的定义中未将 my_str 定义为关键字参数。

    所以可行的解决方案是:

    program.py

    def chord_queue():
        header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
        callback = call_back.subtask(kwargs={'my_str' : 'abcd'})
        header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
        res = chord(header,queue='susanoo_dev' )(callback)
        res1 = chord(header1,queue='susanoo_core')(callback)
        print(res.get())
        print(res1.get())
        print("We are done")
    

    task.py

    @app.task
    def call_back(num, my_str=None):
        print("Everything is done------")
        print("Everything is done------")
        print my_str
        return my_str, sum(num)
    

    它按预期工作,没有任何问题。

    【讨论】:

      猜你喜欢
      • 2019-08-09
      • 1970-01-01
      • 2011-09-20
      • 2012-03-23
      • 1970-01-01
      • 2011-06-29
      • 1970-01-01
      • 1970-01-01
      • 2016-04-26
      相关资源
      最近更新 更多