【问题标题】:Celery task PENDING芹菜任务待处理
【发布时间】:2014-10-02 01:07:04
【问题描述】:

我基本上希望能够创建一个由groupchains 组成的chord。除了我似乎无法完成这项工作之外,所有子链都必须在和弦回调被触发之前完成。

所以我的想法是创建一个while 循环,例如:

data = [foo.delay(i) for i in bar]
complete = {}
L = len(data)
cnt = 0
while cnt != L:
    for i in data:
        ID = i.task_id
        try:
            complete[ID]
        except KeyError:
            if i.status == 'SUCCESS':
                complete[ID] = run_hourly.delay(i.result)
                cnt += 1
                if cnt >= L:
                    return complete.values()

这样当一项任务准备就绪时,它就可以被执行,而不必等待其他任务完成。

我遇到的问题是某些任务的状态永远不会超过'PENDING' 状态。

如果我在for loop 中添加time.sleep(x) 行,那么所有任务都将达到'SUCCESS' 状态,但是data 中有大量sub tasks,该解决方案变得非常低效。

我使用 memcached 作为我的结果后端和 rabbitmq。我的猜测是 for 循环的速度迭代 data 并调用它的任务的属性会创建一个竞争条件,它会中断与 celery 消息传递的连接,从而使这些僵尸任务保持在 'PENDING' 状态。但话又说回来,我可能完全错了,这肯定不是第一次..

我的问题

为什么在迭代刚刚启动的任务列表时需要time.sleep(foo) 来避免永久PENDING 任务?

当一个 celery 任务执行一个循环时,它会阻塞吗?当我尝试关闭陷入无限循环的工作人员时,我无法这样做,并且必须手动找到运行工作人员的 python 进程并将其杀死。如果我让 worker 最终运行,运行的 python 进程将开始消耗几 gigs 的内存,呈指数增长,而且似乎没有限制。

对此事的任何见解将不胜感激。我也愿意接受有关如何完全避免 while loop 的建议。

感谢您的宝贵时间。谢谢。

【问题讨论】:

  • 好吧,它对你没有多大帮助,但你应该坚持使用chord(或者只是一个group和一个callback),因为这正是你需要的。如果它不起作用,则说明其他问题(在您的任务、设置等内部),例如确保您不要忽略设置中的结果。

标签: python multiprocessing celery python-multithreading celery-task


【解决方案1】:

我的 chordgroupchains 组成,正在从 celery 任务中构建和执行。如果您需要访问这些任务的结果,这将产生问题。下面是我尝试做的事情的总结,我最终做了什么,以及我认为我在这个过程中学到的东西,所以也许它可以帮助其他人。

--common_tasks.py--

from cel_test.celery import app

@app.task(ignore_result=False)
def qry(sql, writing=False):
    import psycopg2
    conn = psycopg2.connect(dbname='space_test', user='foo', host='localhost')
    cur = conn.cursor()
    cur.execute(sql)
    if writing == True:
        cur.close()
        conn.commit()
        conn.close()
        return

    a = cur.fetchall()
    cur.close()
    conn.close()
    return a

--foo_tasks.py--

from __future__ import absolute_import

from celery import chain, chord, group
import celery
from cel_test.celery import app
from weather.parse_weather import parse_weather
from cel_test.common_tasks import qry
import cPickle as PKL
import csv
import requests


@app.task(ignore_results=False)
def write_the_csv(data, file_name, sql):
    with open(file_name, 'wb') as fp:
        a = csv.writer(fp, delimiter=',')
        for i in data:
            a.writerows(i)
    qry.delay(sql, True)
    return True

@app.task(ignore_result=False)
def idx_point_qry(DIR='/bar/foo/'):
    with open(''.join([DIR, 'thing.pkl']), 'rb') as f:
            a = PKL.load(f)
    return [(i, a[i]) for i in a]

@app.task(ignore_results=False)
def load_page(url):
    page = requests.get(url, **{'timeout': 30})
    if page.status_code == 200:
        return page.json()

@app.task(ignore_results=False)
def run_hourly(page, info):
    return parse_weather(page, info).hourly()

@app.task(ignore_results=False)
def pool_loop(info):
    data = []
    for iz in info:
        a = chain(load_page.s(url, iz), run_hourly.s())()
        data.append(a)
    return data



@app.task(ignore_results=False)
def run_update(file_name, sql, writing=True):
    chain(idx_point_qry.s(), pool_loop.s(file_name, sql), write_the_csv.s(writing, sql))()
    return True

--- 单独的.py 文件---

from foo_tasks import *

def update_hourly_weather():
    fn = '/weather/csv_data/hourly_weather.csv'
    run_update.delay(fn, "SELECT * from hourly_weather();")
    return True

update_hourly_weather()

我尝试了上面列出的 30 种左右的 .py 文件组合以及其中存在的几种代码组合。我试过chordsgroupschains,从不同的任务启动任务,合并任务。

一些组合最终奏效了,但我不得不在data 上的wrtie_the_csv 任务中直接调用.get(),但 celery 抛出了一个warning,在 4.0 中调用 get() 任务会引发错误所以我认为我不应该这样做..

本质上,我的问题是(现在仍然是)糟糕的任务设计以及它们之间的流程。这导致我遇到从其他任务中同步任务的问题。

我在问题中提出的 while 循环是尝试在另一个任务状态变为 COMPLETE 时异步启动任务,然后在该任务完成时启动另一个任务等等......而不是让 celery 同步执行所以通过致电chordchain。我似乎发现(我不确定我对此是否正确)是,在 celery 任务中,您无法访问识别此类事物所需的范围。以下引自state portion in the celery documentation on tasks“断言世界是任务的责任”。

我正在启动该任务不知道存在的任务,因此对它们一无所知。

我的解决方案是加载和遍历.pkl 文件并同步启动一组chains 并带有回调。我基本上用下面的代码替换了pool_loop 任务,并同步而不是异步启动任务。

--the_end.py--

from foo_tasks import *
from common_tasks import *

def update_hourly_weather():
    fn = '/weather/csv_data/hourly_weather.csv'
    dd = []
    idx = idx_point_qry()
    for i in idx:
        dd.append(chain(load_page.s(i, run_hourly.s(i)))
    vv = chord(dd)(write_the_csv.s(fn, "SELECT * from hourly_weather();"))
    return

update_hourly_weather()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-05-29
    • 2018-02-28
    • 2015-04-01
    • 2020-07-09
    • 2018-04-23
    • 2023-03-29
    • 1970-01-01
    • 2013-03-16
    相关资源
    最近更新 更多