【问题标题】:Dagster -Execute an @Op only when all parallel executions are finished(DynamicOutput)Dagster - 仅当所有并行执行完成时才执行@Op(动态输出)
【发布时间】:2023-01-13 03:13:17
【问题描述】:

我有一个问题,实际上我无法在 dagster 中解决。

我有以下配置:

我在第 1 步中从端点获取数据

第 2 步动态获取客户列表:

第 3 步是使用来自第 1 步的响应对来自第 2 步的每个客户进行并行更新的数据库。

在调用步骤 3 之前,我有一个函数用于为步骤 2 的每个客户端创建 DynamicOutput,名称为“parallelize_clients”,以便在调用它时并行化步骤 3 的更新过程,最后我有一个图表来加入操作.

@op()
def step_1_get_response():
    return {'exemple': 'data'}

@op()
def step_2_get_client_list():
    return ['client_1', 'client_2', 'client_3'] #the number of customers is dynamic.

@op(out=DynamicOut())
def parallelize_clients(context, client_list):
    for client in client_list:
        yield DynamicOutput(client, mapping_key=str(client))


@op()
def step_3_update_database_cliente(response, client):
    ...OPERATION UPDATE IN DATABASE CLIENT

@graph()
def job_exemple_graph():
    response = step_1_get_response()
    clients_list = step_2_get_client_list()
    clients = parallelize_clients(clients_list)
    #run the functions in parallel
    clients.map(lambda client: step_3_update_database_cliente(response, client))

根据文档,@Op 在其依赖关系得到满足后立即启动,对于没有依赖关系的 Ops,它们会立即执行,而没有确切的执行顺序。示例:我的 step1 和 step2 没有依赖关系,因此两者自动并行运行。客户端返回后,执行“parallelize_clients()”函数,最后,我在图中有一个映射,根据客户端的数量动态创建多个执行(DynamicOutput)

到目前为止它有效,一切都很好。这就是问题所在。我只需要在step3完全完成后才执行一个特定的功能,并且由于它是动态创建的,因此会并行生成多个执行,但是,我无法控制仅在所有这些并行执行完成后才执行一个功能。

在图中,我试图在最后调用操作“exemplolasststep() step_4”,但是,步骤 4 与“step1”和“step2”一起执行,我真的希望 step4 仅在 step3 之后执行,但是不是我能以某种方式让它工作。有人可以帮我吗?

我试图用


@op(ins={"start": In(Nothing)})
def step_4():
    pass

在图中,调用操作时,我尝试在 step_4() 函数调用中执行 map 调用;例子

@graph()
def job_exemple_graph():
    response = step_1_get_response()
    clients_list = step_2_get_client_list()
    clients = parallelize_clients(clients_list)
    #run the functions in parallel
    step_4(start=clients.map(lambda client: step_3_update_database_cliente(response, client)))

我也尝试过其他方法,但无济于事。

【问题讨论】:

    标签: dagster


    【解决方案1】:

    您只需要在图中的映射函数上添加 .collect() 调用,以指示所有并行操作都应该在继续之前加入。就像是

    @graph()
    def job_exemple_graph():
        response = step_1_get_response()
        clients_list = step_2_get_client_list()
        clients = parallelize_clients(clients_list)
        # run the functions in parallel
        step_4(
            start=clients.map(
                lambda client: step_3_update_database_cliente(response, client)
            ).collect()
        )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-10-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-14
      • 2020-12-09
      相关资源
      最近更新 更多