【问题标题】:AWS Batch Job Execution Results in Step FunctionStep Function 中的 AWS Batch 作业执行结果
【发布时间】:2021-04-26 08:54:28
【问题描述】:

我是 AWS Step Functions 和 AWS Batch 的新手。我正在尝试将 AWS Batch Job 与 Step Function 集成。 AWS Batch Job 执行简单的 Python 脚本,输出字符串值(高级简化要求)。我需要让 python 脚本输出可用于 step 函数的下一个状态。我应该如何做到这一点。 AWS Batch Job 输出不包含 python 脚本的结果。相反,它包含所有与容器相关的信息以及输入值。

示例:AWS Batch Job 执行输出“Hello World”的 python 脚本。我需要“Hello World”可用于 step 函数的下一个状态,以执行与其关联的 lambda。

【问题讨论】:

    标签: python amazon-web-services aws-step-functions aws-batch


    【解决方案1】:

    您可以将步骤函数执行 ID ($$.Execution.ID) 传递给批处理,然后您的批处理可以使用执行 ID 和主键(或其他文件)将其响应写入 DynamoDB。然后,您需要执行后续步骤以从 DynamoDB 读取 directly 并捕获进程响应。

    我一直在寻找一种无需后续步骤即可做到这一点的方法,但到目前为止还没有骰子。

    【讨论】:

      【解决方案2】:

      我能够做到,下面是我的状态机,我采用示例项目来运行批处理作业 Manage a Batch Job (AWS Batch, Amazon SNS) 并将其修改为两个 lambdas 以传递输入/输出。

      {
        "Comment": "An example of the Amazon States Language for notification on an AWS Batch job completion",
        "StartAt": "Submit Batch Job",
        "TimeoutSeconds": 3600,
        "States": {
          "Submit Batch Job": {
            "Type": "Task",
            "Resource": "arn:aws:states:::batch:submitJob.sync",
            "Parameters": {
              "JobName": "BatchJobNotification",
              "JobQueue": "arn:aws:batch:us-east-1:1234567890:job-queue/BatchJobQueue-737ed10e7ca3bfd",
              "JobDefinition": "arn:aws:batch:us-east-1:1234567890:job-definition/BatchJobDefinition-89c42b1f452ac67:1"
            },
            "Next": "Notify Success",
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "Next": "Notify Failure"
              }
            ]
          },
          "Notify Success": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:1234567890:function:readcloudwatchlogs",
            "Parameters": {
              "LogStreamName.$": "$.Container.LogStreamName"
            },
            "ResultPath": "$.lambdaOutput",
            "Next": "ConsumeLogs"
          },
          "ConsumeLogs": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:1234567890:function:consumelogs",
            "Parameters": {
              "randomstring.$": "$.lambdaOutput.logs"
            },
            "End": true
          },
          "Notify Failure": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sns:publish",
            "Parameters": {
              "Message": "Batch job submitted through Step Functions failed",
              "TopicArn": "arn:aws:sns:us-east-1:1234567890:StepFunctionsSample-BatchJobManagement17968f39-e227-47ab-9a75-08a7dcc10c4c-SNSTopic-1GR29R8TUHQY8"
            },
            "End": true
          }
        }
      }
      

      读取日志的关键是在包含LogStreamNameSubmit Batch Job 输出中,我将其传递给名为function:readcloudwatchlogs 的lambda 并读取日志,然后最终将读取日志传递给名为function:consumelogs 的下一个函数.您可以在随附的屏幕截图中看到consumelogs 打印日志的功能。

      
      {
        "Attempts": [
          {
            "Container": {
              "ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243",
              "ExitCode": 0,
              "LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b",
              "NetworkInterfaces": [],
              "TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b"
            },
            "StartedAt": 1611329367577,
            "StatusReason": "Essential container in task exited",
            "StoppedAt": 1611329367748
          }
        ],
        "Container": {
          "Command": [
            "echo",
            "Hello world"
          ],
          "ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243",
          "Environment": [
            {
              "Name": "MANAGED_BY_AWS",
              "Value": "STARTED_BY_STEP_FUNCTIONS"
            }
          ],
          "ExitCode": 0,
          "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
          "LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b",
          "TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b",
      ..
        },
      ..
        "Tags": {
          "resourceArn": "arn:aws:batch:us-east-1:1234567890:job/d36ba07a-54f9-4acf-a4b8-3e5413ea5ffc"
        }
      }
      
      
      • 读取日志 Lambda 代码:
      import boto3
      
      client = boto3.client('logs')
      
      def lambda_handler(event, context):
          print(event)
          response = client.get_log_events(
              logGroupName='/aws/batch/job',
              logStreamName=event.get('LogStreamName')
          )
          log = {'logs': response['events'][0]['message']}
          return log
      
      • 使用日志 Lambda 代码
      import json
      
      print('Loading function')
      
      
      def lambda_handler(event, context):
          print(event)
      

      【讨论】:

      • 所以基本上你从日志中读取值。没有别的办法了
      • 是的,我查看了Step Function CallBack 模式,但涉及在定义中添加wait,我不想这样做。
      • 这是一种创新的方式。我正在处理的用例可能会生成大量日志,读取日志以查找作业的输出可能会适得其反,但这很有帮助。另一种方法是将输出保存到表中并在其他状态下引用它。
      • 我只是建立在被问到的问题之上,所以我能够做问题中的事情。考虑到您没有超过AWS Step Functions payload size 256KB,您仍然可以传递日志。当然你可以这样做,但这又涉及在你的状态机中添加一个等待,我已经提到我避免了。
      • 当我们谈到将输出保存到表格时,您可以简单地Using Lambda with CloudWatch Logs 然后将其保存到表格中。但是您必须在状态机中等待,并且您永远无法确定数据何时写入表中,这可能会导致竞争条件。
      猜你喜欢
      • 1970-01-01
      • 2015-11-25
      • 2018-10-18
      • 2020-11-15
      • 1970-01-01
      • 2016-08-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多