【问题标题】:Celery calling different function and continue the chaining process芹菜调用不同的函数并继续链接过程
【发布时间】:2016-07-28 06:21:48
【问题描述】:

我的任务链包含三个任务 fetch_page、check_source 和 store page

def update_page_info(**headers):
    chain=fetch_page.s(headers['key']) | check_source.s(headers['key_1']) | store_info.s()
    chain().apply_async()

fetch_page 获取页面并收集它需要收集的内容:

@app.task(bind=True)
def fetch_page(self,url):
    #fetch_page here and return a tuple so that it can be unpacked
    # dosomething

现在获取页面后,它会在下一个任务 check_source 中检查源。

@app.task(bind=True)
def check_source(self,page_and_url,handle):
    try:
        #unpack your stuffs here
        page,url=page_and_url
        get_result={}

        if handle=='first_option':
            get_result=select_first_option(one,two)
            return get_result

        elif handle=='second_option':
            get_result=select_second_option(one,two)
            return (get_result)

        elif handle=='third_option':
            get_result=select_third_option(one,two)
            return (get_result)
        else:
            return "IGNORE FOR NOW"
    except Exception as exc:
        pass

所以困惑是我可以从这里调用一些其他任务吗?会不会有任何不一致,或者工人会不会在这样做时陷入僵局?

最后它应该执行 store_info() 来存储从 check_source() 返回的东西

@app.task(bind=True)
def store_info(self,result):
    print ("store_info ")
    try:
        #store the fetched pages

    except Exception as exc:
        #dosomething
    finally:
        pass

我正在遵循这种方法,只需要稍加修改http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks

任何人都可以建议我应该如何做以及我需要更加小心的事情吗?

【问题讨论】:

    标签: python python-2.7 celery celery-task


    【解决方案1】:

    这一切都应该像您正在阅读(和交流)一样工作。这三个任务将按顺序执行,没有任何“不一致”。

    如果您调用update_page_info 一次,三个链接的子任务将彼此独占运行。也就是说,这种设置仍有可能出现死锁。如果您调用update_page_info,而您上次调用它时的先前任务正在运行,那么您可以同时运行多个任务。这将引入死锁的可能性,具体取决于您的任务共享资源的方式。

    如果您的任务共享资源,我建议使用 redis 或 memcached 之类的东西作为跨工作人员的锁定系统。

    编辑:我现在看到的代码完全没有问题,因为结果作为参数传递给下一个任务。

    【讨论】:

    • 到目前为止,我一直在测试这个及其工作文件的几个任务,只是担心工人是否超载,我想我需要用更多的任务来测试它。谢谢。
    • 测试是一个很好的起点!您的场景是否允许多个工作人员一次执行相同的任务?如果这是一个问题,那么您需要确保一次只能执行一项任务。您可以使用 celery 工作进程共享的互斥锁/信号量来执行此操作。你到底想做什么?
    • 现在我只使用一个工人来执行相同的任务。我打算使用 2/3 的工作人员并且不知道互斥量/信号量。我只是想抓取并保存结果。
    • 应该没问题! :)
    猜你喜欢
    • 2019-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-14
    • 2019-09-02
    • 2015-09-30
    相关资源
    最近更新 更多