【问题标题】:kubeflow pipeline dynamic output list as input parameterkubeflow 管道动态输出列表作为输入参数
【发布时间】:2020-04-14 03:26:00
【问题描述】:

我在动态列表上使用 ParallelFor。我想收集循环中的所有输出,并将它们传递给另一个 ContainerOp。
类似以下的内容,显然不起作用,因为 outputs 列表将是静态的。

with dsl.ParallelFor(op1.output) as item:
    op2 = dsl.ContainerOp(
      name='op2',
      ...
      file_outputs={
         'outputs': '/outputs.json',
    })
    outputs.append(op2.output)


op3 = dsl.ContainerOp(
   name='op3',
   ...
   arguments=['--input': outputs]  # won't work
)

【问题讨论】:

    标签: python kubeflow kubeflow-pipelines


    【解决方案1】:

    我也遇到了动态“扇出”然后使用 Kubeflow 管道“扇入”的问题。也许有点笨手笨脚,但我使用了一个安装的 PVC 声明来克服这个问题。

    Kubeflow 允许您使用VolumeOp(链接here)即时挂载已知的PVC 或创建新的PVC。这个 sn-p 展示了如何使用已知的 PVC。

        pvc_name = '<available-pvc-name>' 
        pvc_volume_name = '<pvc-uuid>' # pass the pvc uuid here
    
        # Op 1 creates a list to iterate over
        op_1 = dsl.ContainerOp(
                name='echo',
                image='library/bash:4.4.23',
                command=['sh', '-c'],
                arguments=['echo "[1,2,3]"> /tmp/output.txt'],
                file_outputs={'output': '/tmp/output.txt'})
    
        # Using withParam here to iterate over the results from op1
        # and writing the results of each step to its own PVC
        with dsl.ParallelFor(op_1.output) as item:
            op_2 = dsl.ContainerOp(
                name='iterate',
                image='library/bash:4.4.23',
                command=['sh', '-c'],
                arguments=[f"echo item-{item} > /tmp/output.txt; "  # <- write to output  
                           f"mkdir -p /mnt/{{workflow.uid}}; "  # <- make a dir under /mnt
                           f"echo item-{item}\n >> /mnt/{{workflow.uid}}"],  # <- append results from each step to the PVC
                file_outputs={'output': '/tmp/output.txt'},
                # mount the PVC
                pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)})
    
        op_3 = dsl.ContainerOp(
                name='echo',
                image='library/bash:4.4.23',
                command=['sh', '-c'],
                arguments=[f"echo /mnt/{{workflow.uid}} > /tmp/output.txt"],
                # mount the PVC again to use
                pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)},
                file_outputs={'output': '/tmp/output_2.txt'}).after(op_2)
    

    确保op_3 在来自op_2 的循环之后运行,最后使用after(op_2)

    注意:这可能是一种严厉的方法,如果 KFP 允许将其作为 KF 编译器的一部分,可能会有更好的解决方案,但我无法让它工作。如果在环境中创建 PVC 很容易,这可能适用于您的情况。

    【讨论】:

    • 我正在考虑写入共享存储,这确实有效,但我希望有一个更好的面向 kubeflow 的解决方案,因为这会迫使容器“知道”它正在循环运行
    • 无关注意:请使用component.yaml 文件而不是手动创建ContainerOp 实例。这些组件很容易创建,但可重复使用和便携。谢谢。
    【解决方案2】:

    不幸的是,方舟君的解决方案对我不起作用。但是如果我们事先知道输入的数量,有一种简单的方法可以实现扇入工作流。我们可以像这样预先计算管道 DAG:

    @kfp.components.create_component_from_func
    def my_transformer_op(item: str) -> str:
        return item + "_NEW"
    
    
    @kfp.components.create_component_from_func
    def my_aggregator_op(items: list) -> str:
        return "HELLO"
    
    
    def pipeline(array_of_arguments):
        @dsl.pipeline(PIPELINE_NAME, PIPELINE_DESCRIPTION)
        def dynamic_pipeline():
            outputs = []
            for i in array_of_arguments:
                outputs.append(my_transformer_op(str(i)).output)
            my_aggregator_op(outputs)
        return dynamic_pipeline
    
    ...
    
        run_id = client.create_run_from_pipeline_func(
            pipeline(data_samples_chunks), {},
            run_name=PIPELINE_RUN,
            experiment_name=PIPELINE_EXPERIMENT).run_id
    

    【讨论】:

      【解决方案3】:

      这只有在您事先知道输入/输出时才有效,因此它不是真正动态的。但它确实解决了我当前的用例。

      实际上我发现这样管理管道非常困难,我建议您看看 Ploomber https://github.com/ploomber/ploomber/?ref=stacko

      设置管道和依赖项非常容易,并且它与大多数提供程序(气流、argo 等)集成。我知道他们目前正在开发 Kubeflow 连接器(类似于 Kale,但更简单)。 无论如何,它确实让我的生活更轻松了。

      【讨论】:

        【解决方案4】:

        问题是op3 没有正确引用op2 的输出作为输入参数。试试这个:

        op3 = dsl.ContainerOp(
            ...
            arguments=['--input': op2.outputs['outputs']]
        )
        

        【讨论】:

        • 这不会只引用最后一个 op2 吗?因为有多个。或者这里有一些 kubeflow 魔法
        • 是的,前段时间我确实在文档中看到了这一点。我最近没有检查它。不过,我为此写了一篇博客:ekababisong.org/kubeflow-for-poets
        • invalid spec: templates.for-loop-for-loop-1a944bdb-1.outputs failed to resolve {{tasks.test-step-2.outputs.parameters.test-step-2-output}}
        猜你喜欢
        • 2020-12-16
        • 2010-12-16
        • 1970-01-01
        • 2021-02-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-07-11
        相关资源
        最近更新 更多