【问题标题】:AWS - Step functions, use execution input within a TuningStepAWS - 步骤函数,在 TuningStep 中使用执行输入
【发布时间】:2021-02-17 05:16:42
【问题描述】:

我已经编写了一个简单的 AWS step functions 工作流程,只需一步:

from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import Chain, TuningStep
from stepfunctions.workflow import Workflow
import train_utils


def main():
    workflow_execution_role = 'arn:aws:iam::MY ARN'
    execution_input = ExecutionInput(schema={
        'app_id': str
    })
    estimator = train_utils.get_estimator()
    tuner = train_utils.get_tuner(estimator)

    tuning_step = TuningStep(state_id="HP Tuning", tuner=tuner, data={
        'train': f's3://my-bucket/{execution_input["app_id"]}/data/'},
                             wait_for_completion=True,
                             job_name='HP-Tuning')

    workflow_definition = Chain([
        tuning_step
    ])

    workflow = Workflow(
        name='HP-Tuning',
        definition=workflow_definition,
        role=workflow_execution_role,
        execution_input=execution_input
    )
    workflow.create()


if __name__ == '__main__':
    main()

我的目标是从运行时提供的执行 JSON 中提取训练输入。当我执行工作流时(从步骤函数控制台),提供 JSON {"app_id": "My App ID"} 时,调整步骤不会获得正确的数据,而是获得stepfunctions.inputs.placeholders.ExecutionInput 的 to_string 表示。此外,在查看生成的 ASL 时,我可以看到执行输入被呈现为字符串:

... 
"DataSource": {
   "S3DataSource": {
   "S3DataType": "S3Prefix",
   "S3Uri": "s3://my-bucket/<stepfunctions.inputs.placeholders.ExecutionInput object at 0x12261f7d0>/data/",
    "S3DataDistributionType": "FullyReplicated"
    }
},
...

我做错了什么?

更新: 正如@yoodan 提到的,SDK 可能落后了,所以我必须在调用 create 之前编辑定义。我可以看到有一种方法可以在调用 create 之前查看定义,但是我可以修改图形定义吗?怎么样?

【问题讨论】:

标签: python machine-learning state-machine aws-step-functions hyperparameters


【解决方案1】:

SDK 似乎不支持您要查找的内容。 https://github.com/aws/aws-step-functions-data-science-sdk-python/issues/79

但是,您可以在创建状态机之前更改定义。这是一个函数,它将迭代定义字典并用正确的内在函数语法替换用 {{PH}} 包围的占位符,例如:

s3://my-bucket/{{app_id}}/data/

def get_updated_definition(data):
    if isinstance(data, dict):
        for k, v in data.copy().items():
            if isinstance(v, dict):  # For DICT
                data[k] = get_updated_definition(v)
            elif isinstance(v, list):  # For LIST
                data[k] = [get_updated_definition(i) for i in v]
            elif isinstance(v, str) and re.search(r'{{([a-z_]+)}}', v):  # Update Key-Value
                # data.pop(k)
                # OR
                del data[k]
                keys = re.findall(r'{{([a-z_]+)}}', v)
                data[f"{k}.$"] = f"States.Format('{re.sub(r'{{[a-z_]+}}', '{}', v)}',{','.join(['$.'+k for k in keys])})"

    return data

用法:

workflow_definition = get_updated_definition(workflow.definition.to_dict())
create_state_machine(json.dumps(workflow_definition)) #<-- implement using boto3 (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.create_state_machine)

【讨论】:

    【解决方案2】:

    您对步进函数的定义很好,看起来应该可以工作。
    从您的代码示例中不清楚执行实际发生的位置(您直接从控制台说明),这可能导致可能导致问题的几个选项,我相信这就是问题的根源。
    请提供更多相关信息。

    您是否以某种方式导出您创建的Workflow 对象以便执行它?
    似乎缺少某些东西...作为健全性检查,请将以下内容附加到您的 main 函数中:

    workflow.execute(inputs={"app_id": "My App ID"})
    

    然后再次检查日志

    【讨论】:

      【解决方案3】:

      步骤函数的python SDK生成相应的代码,我们需要亚马逊状态语言内置的字符串连接/格式来完成你想要的。

      最近在 2020 年 8 月,Amazon States Language 在其语言规范中引入了字符串格式等内置函数。 https://states-language.net/#appendix-b

      很遗憾,python SDK 不是最新的,不支持新的更改。

      作为一种解决方法,是否可以在调用工作流创建之前手动修改定义?

      【讨论】:

      • 谢谢@yoodan,我尝试了你提到的内在函数,如果我手动操作,效果很好。 modify the definition before calling workflow create? 不创建生成定义是什么意思?
      猜你喜欢
      • 2021-11-20
      • 2022-10-07
      • 2021-04-13
      • 1970-01-01
      • 2019-12-03
      • 2021-08-31
      • 1970-01-01
      • 2021-08-02
      • 1970-01-01
      相关资源
      最近更新 更多