我的 chord 由 group 和 chains 组成,正在从 celery 任务中构建和执行。如果您需要访问这些任务的结果,这将产生问题。下面是我尝试做的事情的总结,我最终做了什么,以及我认为我在这个过程中学到的东西,所以也许它可以帮助其他人。
--common_tasks.py--
from cel_test.celery import app
@app.task(ignore_result=False)
def qry(sql, writing=False):
import psycopg2
conn = psycopg2.connect(dbname='space_test', user='foo', host='localhost')
cur = conn.cursor()
cur.execute(sql)
if writing == True:
cur.close()
conn.commit()
conn.close()
return
a = cur.fetchall()
cur.close()
conn.close()
return a
--foo_tasks.py--
from __future__ import absolute_import
from celery import chain, chord, group
import celery
from cel_test.celery import app
from weather.parse_weather import parse_weather
from cel_test.common_tasks import qry
import cPickle as PKL
import csv
import requests
@app.task(ignore_results=False)
def write_the_csv(data, file_name, sql):
with open(file_name, 'wb') as fp:
a = csv.writer(fp, delimiter=',')
for i in data:
a.writerows(i)
qry.delay(sql, True)
return True
@app.task(ignore_result=False)
def idx_point_qry(DIR='/bar/foo/'):
with open(''.join([DIR, 'thing.pkl']), 'rb') as f:
a = PKL.load(f)
return [(i, a[i]) for i in a]
@app.task(ignore_results=False)
def load_page(url):
page = requests.get(url, **{'timeout': 30})
if page.status_code == 200:
return page.json()
@app.task(ignore_results=False)
def run_hourly(page, info):
return parse_weather(page, info).hourly()
@app.task(ignore_results=False)
def pool_loop(info):
data = []
for iz in info:
a = chain(load_page.s(url, iz), run_hourly.s())()
data.append(a)
return data
@app.task(ignore_results=False)
def run_update(file_name, sql, writing=True):
chain(idx_point_qry.s(), pool_loop.s(file_name, sql), write_the_csv.s(writing, sql))()
return True
--- 单独的.py 文件---
from foo_tasks import *
def update_hourly_weather():
fn = '/weather/csv_data/hourly_weather.csv'
run_update.delay(fn, "SELECT * from hourly_weather();")
return True
update_hourly_weather()
我尝试了上面列出的 30 种左右的 .py 文件组合以及其中存在的几种代码组合。我试过chords,groups,chains,从不同的任务启动任务,合并任务。
一些组合最终奏效了,但我不得不在data 上的wrtie_the_csv 任务中直接调用.get(),但 celery 抛出了一个warning,在 4.0 中调用 get() 任务会引发错误所以我认为我不应该这样做..
本质上,我的问题是(现在仍然是)糟糕的任务设计以及它们之间的流程。这导致我遇到从其他任务中同步任务的问题。
我在问题中提出的 while 循环是尝试在另一个任务状态变为 COMPLETE 时异步启动任务,然后在该任务完成时启动另一个任务等等......而不是让 celery 同步执行所以通过致电chord 或chain。我似乎发现(我不确定我对此是否正确)是,在 celery 任务中,您无法访问识别此类事物所需的范围。以下引自state portion in the celery documentation on tasks“断言世界是任务的责任”。
我正在启动该任务不知道存在的任务,因此对它们一无所知。
我的解决方案是加载和遍历.pkl 文件并同步启动一组chains 并带有回调。我基本上用下面的代码替换了pool_loop 任务,并同步而不是异步启动任务。
--the_end.py--
from foo_tasks import *
from common_tasks import *
def update_hourly_weather():
fn = '/weather/csv_data/hourly_weather.csv'
dd = []
idx = idx_point_qry()
for i in idx:
dd.append(chain(load_page.s(i, run_hourly.s(i)))
vv = chord(dd)(write_the_csv.s(fn, "SELECT * from hourly_weather();"))
return
update_hourly_weather()