【问题标题】:What is the cleanest way to write and run a DAG of tasks?编写和运行 DAG 任务的最简洁方法是什么?
【发布时间】:2019-10-27 16:30:17
【问题描述】:

我想编写并运行一个有向无环图 (DAG),其中包含多个串行或并行运行的任务。理想情况下,它看起来像:

def task1():
    # ...

def task2():
    # ...

graph = Sequence([
    task1,
    task2,
    Parallel([
        task3,
        task4
    ]),
    task5
]

graph.run()

它将运行 1 -> 2 ->(3 和 4 同时运行)-> 5. 任务需要访问全局范围来存储结果、写入日志和访问命令行参数。

我的用例是编写部署脚本。 并行任务受 IO 限制:通常在远程服务器上等待完成一个步骤。

我研究了 threading、asyncio、Airflow,但没有找到任何简单的库可以在没有一些样板代码的情况下遍历并控制图形的执行。有这样的东西吗?

【问题讨论】:

  • 编写队列任务的sequenceparallel 函数可能相当简单。 sequence 基本上只是一个for 循环,parallel 只是将任务交给multiprocessing.Pool.map
  • 这些任务是受 IO 限制还是受 CPU 限制?如果是后者,您将需要深入多处理兔子洞,并可能下载该模块的替代版本。内置的multiprocessing 有严重的局限性。
  • IO 绑定(在远程服务器上等待完成部署步骤)
  • 可能不会,考虑到用例,但您会拥有大量 Parallel 对象(如运行时每秒数百个)吗?可同时运行的任务总数会超过一千个吗?
  • 这个问题的解决方案但是在java中:stackoverflow.com/q/63354899/1925388

标签: python concurrency directed-acyclic-graphs


【解决方案1】:

这是一个快速的概念验证实现。它可以像这样使用:

graph = sequence(
            lambda: print(1),
            lambda: print(2),
            parallel(
                lambda: print(3),
                lambda: print(4),
                sequence(
                    lambda: print(5),
                    lambda: print(6))),
             lambda: print(7)

graph()

1
2
3
5
6
4
7

sequence 产生一个包装for 循环的函数,parallel 产生一个包装线程池使用的函数:

from typing import Callable
from multiprocessing.pool import ThreadPool

Task = Callable[[], None]

_pool: ThreadPool = ThreadPool()

def sequence(*tasks: Task) -> Task:
    def run():
        for task in tasks:
            task()

    return run  # Returning "run" to be used as a task by other "sequence" and "parallel" calls

def parallel(*tasks: Task) -> Task:
    def run():
        _pool.map(lambda f: f(), tasks)  # Delegate to a pool used for IO tasks

    return run

sequenceparallel 的每次调用都会返回一个新的“任务”(一个不带参数且不返回任何内容的函数)。然后可以通过对sequenceparallel 的其他外部调用来调用该任务。

关于ThreadPool的注意事项:

  • 虽然这确实为parallel 使用了一个线程池,但由于 GIL,它仍然一次只执行一件事。这意味着parallel 对于 CPU 密集型任务基本上没有用处。

  • 我没有指定池应该从多少线程开始。我认为它默认为您可用的核心数量。如果需要更多,可以使用ThreadPool 的第一个参数指定要开始的数量。

  • 为简洁起见,我不会清理ThreadPool。如果你使用这个,你绝对应该这样做。

  • 尽管ThreadPoolmultiprocessing 的一部分,但令人困惑的是,它使用的是线程而不是进程。

【讨论】:

  • @Carcigenicate 你说应该清理线程池是指什么?
  • @haidahaida 理想情况下,线程池需要手动关闭。完成后手动调用池上的shutdown
【解决方案2】:

你提到你的任务是 IO 绑定的,这意味着 asycnio 将是一个很好的候选者。你可以试试aiodag 库,这是一个基于 asycnio 的非常轻量级的接口,可以让你轻松定义异步 dag:

import asyncio
from aiodag import task

@task
async def task1(x):
    ...

@task
async def task2(x):
    ...

@task
async def task3(x):
    ...

@task
async def task4(x):
    ...

@task
async def task5(x, y):
    ...

# rest of task funcs

async def main():
    t1 = task1()
    t2 = task2(t1)
    t3 = task3(t2)  # t3/t4 take t2, when t2 finishes, will run concurrently
    t4 = task4(t2)
    t5 = task5(t3, t4) # will wait until t3/t4 finish to execute
    await t5

loop = asyncio.new_event_loop()
asyncio.run_until_complete(main())

查看 aiodag 的 github 页面上的自述文件,了解有关如何构建/优化执行 dag 的一些详细信息。 https://github.com/aa1371/aiodag

如果您不想被绑定到异步函数,请查看 dask 的延迟接口。 dag 的定义与 aiodag 的定义相同,其中 dag 由函数调用构造。 Dask 将在最佳并行方案中无缝处理您的 dag,并且可以分布在任意大的集群上以执行并行执行。

https://docs.dask.org/en/latest/delayed.html

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-02-07
    • 2010-12-08
    • 1970-01-01
    • 1970-01-01
    • 2010-12-06
    相关资源
    最近更新 更多