【问题标题】:How can you ensure, that the same pipeline is not executed twice at the same time您如何确保同一管道不会同时执行两次
【发布时间】:2021-02-28 19:01:48
【问题描述】:

嘿 :) 我对锁定或互斥行为有疑问。

场景

让我们假设以下场景:

  1. 管道正在处理一些本地文件。这些文件由 CI-CD 作业放置。处理后我想删除文件。如果作业花费的时间比计划间隔长,这将导致竞争条件
  2. 两条流水线占用大量资源,因此无法并行运行。

可能的解决方案

  • 目前我会在正在运行的服务中使用某种互斥锁或锁,其中管道可以注册并允许执行或不执行。
  • 复制数据以确保每个工作流都可以清理和使用自己的数据。
  • 创建本地锁定文件,并确保成功后将删除该文件。
  • 创建一个较小的计划间隔并检查是否存在锁定。如果条件不满足,则干净退出。

我知道这可能不是 dagster 的正常用例,但我也想将 dagster 用于其他工作流程,例如清理任务和触发其他管道。

谢谢

【问题讨论】:

    标签: python dagster


    【解决方案1】:

    我不熟悉 dagster,但我在其他环境中成功使用的一种机制是利用在类 Unix 系统中 rename 或 mv 是原子操作的事实。对于运行后清理的第一个要求:

    1. 新文件被放入输入目录。一组输入文件可以被隔离在它们自己的目录中。

    2. 当管道进程启动时,它的第一个操作是从输入目录中选择一个文件(或目录)并将其 mv 到管道实例拥有的工作目录中。如果输入目录中没有可用的文件,则进程会优雅地自行关闭。

    3. 如果 mv 成功,则进程继续对刚刚移动到其工作目录的文件(目录)执行其操作。完成后,它会自行清理,可能通过对其工作目录执行递归删除。

    4. 如果 mv 失败,则意味着另一个进程从该文件下抓取了新文件。失败的进程会优雅地自行关闭。

    对于一次只运行一个管道进程的第二个要求,您可以使用独占创建哨兵文件,如果没有成功创建哨兵文件,则进程失败并退出。在 python 3 中,代码可能类似于

    try:
        open('sentinel', 'x').close()
    except FileExistsError:
        exit("someone else already has sentinel")
    
    do_stuff()
    
    os.remove('sentinel')
    

    当然,如果你的进程在 do_stuff() 中的某个地方崩溃了,你必须手动清理标记文件,或者你可以使用 atexit 处理程序来确保即使在崩溃的情况下也会删除标记在 do_stuff() 中。

    【讨论】:

    • 确实,这正是我处理此类问题的第一种方法。感谢分享实际代码。同样,我只是想知道这种功能是否已经实现,或者在 dagster (dagster.io) 中是否已经提供了替代解决方案
    【解决方案2】:

    感谢您分享您的用例。我不认为 Dagster 目前原生支持这些功能。然而,0.10.0 版本(几个月后)将包括运行级队列,允许您对并发管道运行设置限制。目前它仅支持运行的全局限制,但很快将支持基于管道标签添加规则(例如,标记为“资源重”的管道可能限制为 3 个并发运行)。看起来这可能适合这个用例?

    预览当前排队系统的指南是here。也请随时通过 @johann 的 Dagster 闲置与我联系!

    【讨论】:

      【解决方案3】:

      对于场景 #2(处理资源非常繁重且无法并行运行的管道)的建议是使用 dagster 的 Celery 集成,例如 celery_executorcelery_docker_executor 或 @987654324 @(如果你在 kubernetes 上)。

      这些工作的方式是 Dagster 管道运行协调器会将每个可靠的执行任务添加到 Celery 队列中,并且 Celery 允许您限制每个队列中活动任务的数量。例如,这通常用于确保在给定时间只有 X 个实体连接到 Redshift。

      Dagster 还支持使用多个队列,因此您可以为资源密集型实体创建一个队列,为非资源密集型实体创建另一个队列(具有更高的并发限制)。

      关于场景 #1,我不确定您的设计限制是什么。一个想法是使用管道运行标记的标记方案来跟踪哪个管道运行对应于哪个文件;然后对于每个文件,执行文件清理的进程在删除之前首先验证是否存在成功的管道运行(通过查询运行数据库)。

      【讨论】:

        猜你喜欢
        • 2017-07-31
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-10-20
        • 2013-05-07
        • 1970-01-01
        相关资源
        最近更新 更多