【问题标题】:how to get celery tasks id如何获取芹菜任务ID
【发布时间】:2017-05-18 15:17:24
【问题描述】:

我使用 celery beat 设置了一个周期性任务。任务运行,我可以在控制台中看到结果。 我想要一个 python 脚本来回忆任务抛出的结果。

我可以这样做:

#client.py
from cfg_celery import app
task_id = '337fef7e-68a6-47b3-a16f-1015be50b0bc'
try:
    x = app.AsyncResult(id)
    print(x.get())
except:
    print('some error')

无论如何,如您所见,对于此测试,我必须复制 celery beat 控制台(可以这么说)抛出的 task_id 并将其硬编码到我的脚本中。显然,这在实际生产中是行不通的。

我在 celery 配置文件中设置了task_id

#cfg_celery.py
app = Celery('celery_config',
        broker='redis://localhost:6379/0',
        include=['taskos'],
        backend = 'redis'
        )
app.conf.beat_schedule = {
    'something': {
        'task': 'tasks.add',
        'schedule': 10.0,
        'args': (16, 54),
        'options' : {'task_id':"my_custom_id"},
    }
}

这样我可以这样读:

#client.py
from cfg_celery import app
task_id = 'my_custom_id'
try:
    x = app.AsyncResult(id)
    print(x.get())
except:
    print('some error')

这种方法的问题是我丢失了之前的结果(之前调用client.py)。

有什么方法可以读取 celery 后端中的 task_id 列表吗? 如果我有多个定期任务,我可以从每个定期任务中获取task_id 的列表吗? 我可以使用app.tasks.key() 来完成这个,如何?

pd:不是英语母语,加上 celery 新手,如果我用错了一些术语,请多多关照。

【问题讨论】:

  • 我突然想到,为了得到我想要的东西(从其他 python 实例中获取结果),我需要使用 redis 函数来存储和检索值。我将使用 zadd 将结果写入 redis,并在“client.py”上使用 zrange 来检索它们。

标签: python-3.x celery


【解决方案1】:

好的。我不确定是否没有人回答这个问题是因为很难还是因为我的问题太愚蠢了。 无论如何,我想做的是从另一个 python 进程中获取我的“celery-beat”任务的结果。 在同一个过程中没有问题,我可以访问任务 ID,从那里开始一切都很容易。但是从其他过程中我没有找到一种方法来检索已完成任务的列表。

我尝试了 python-RQ(很好)但是当我看到使用 RQ 时我也无法做到这一点,我开始明白我必须手动使用 redis 存储功能。所以我得到了我想要的,这样做:

。使用 'bind=True' 可以在任务函数中进行自省。 .一旦我得到了函数的结果,我把它写在redis的一个列表中(我做了一些技巧来限制这个列表的大小) .现在我可以从一个独立的进程连接到同一个 redis 服务器并检索存储在此类列表中的结果。

我的文件最终是这样的:

cfg_celery.py :在这里我定义了任务的调用方式。

#cfg_celery.py
from celery import Celery

appo = Celery('celery_config',
        broker='redis://localhost:6379/0',
        include=['taskos'],
        backend = 'redis'
        )

'''
urlea se decoro como periodic_task. no hay necesidad de darla de alta aqi.
pero como add necesita args, la doy de alta manualmente p pasarselos
'''
appo.conf.beat_schedule = {
    'q_loco': {
        'task': 'taskos.add',
        'schedule': 10.0,
        'args': (16, 54),
        # 'options' : {'task_id':"lcura"},
    }
}

taskos.py :这些是任务。

#taskos.py
from cfg_celery import appo
from celery.decorators import periodic_task
from redis import Redis

from datetime import timedelta
import requests, time

rds = Redis()

@appo.task(bind=True)
def add(self,a, b):
    #result of operation. very dummy.
    result = a + b

    #storing in redis
    r= (self.request.id,time.time(),result)
    rds.lpush('my_results',r)

    # for this test i want to have at most 5 results stored in redis
    long = rds.llen('my_results')
    while long > 5:
        x = rds.rpop('my_results')
        print('popping out',x)
        long = rds.llen('my_results')
        time.sleep(1)
    return a + b


@periodic_task(run_every=20)
def urlea(url='https://www.fullstackpython.com/'):
    inicio = time.time()
    R = dict()
    try:
        resp = requests.get(url)
        R['vato'] = url+" = " + str(resp.status_code*10)
        R['num palabras'] = len(resp.text.split())
    except:
        R['vato'] = None
        R['num palabras'] = 0        
    print('u {} : {}'.format(url,time.time()-inicio))
    time.sleep(0.8) # truco pq se vea mas claramente la dif.
    return R

consumer.py : 可以得到结果的独立进程。

#consumer.py
from redis import Redis
nombre_lista = 'my_results'

rds = Redis()

tamaño = rds.llen(nombre_lista)
ultimos_resultados = list()
for i in range(tamaño):
    ultimos_resultados.append(rds.rpop(nombre_lista))

print(ultimos_resultados)

我对编程比较陌生,我希望这个答案可以帮助像我这样的菜鸟。如果我有错误,请随时根据需要进行更正。

【讨论】: