【问题标题】:AWS Step Function - Wait until an eventAWS Step Function - 等到一个事件
【发布时间】:2017-01-18 07:13:47
【问题描述】:

我有一个用例,其中我有一个 AWS Step 函数,该函数在文件上传到 S3 时触发,第一步从那里运行 ffprobe 以从外部服务(例如 transloadit)获取文件的持续时间,其中输出写回 S3。

我可以从该事件中创建一个新的步进函数,但我正在徘徊,是否可以在原始步进函数中有一个 Await 承诺,然后继续下一个 - 考虑到 ffprobe 可能需要更长的时间卷土重来。

非常感谢任何有关如何解决此问题的建议。

【问题讨论】:

    标签: aws-lambda aws-step-functions


    【解决方案1】:

    AWS Step Functions 现在支持将长时间运行的步骤作为一流的异步回调。

    这类似于上面@mixja 的答案,但简化了。工作流中的单个状态可以直接调用 Lambda、SNS、SQS 或 ECS 并等待对 SendTaskSuccess 的调用。

    有一个为 SQS 记录的good example,其中一个步骤函数发送一条消息并暂停工作流执行,直到有东西提供回调。 Lambda 将是等效的(假设像 transloadit 这样的主要处理发生在 Lambda 本身之外)

    您的步进函数定义如下所示

    "Invoke transloadit": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "InvokeTransloadit",
        "Payload": {
            "some_other_param": "...",
            "token.$": "$$.Task.Token"
         }
      },
      "Next": "NEXT_STATE"
    }
    

    然后在你的 Lambda 中你会做类似的事情

    def lambda_handler(event, context):
        token = event['token']
    
        # invoke transloadit via SSM, ECS, passing token along
    

    然后在您的主要长期运行过程中,您将使用来自 shell 脚本/CLI 的 aws stepfunctions send-task-success --task-token $token 之类的令牌发出回调,或通过 API 调用发出类似的回调。

    【讨论】:

      【解决方案2】:

      当您将请求发送到 transloadit 时,将 s3 中步骤的 taskToken 保存在基于上传的文件密钥的可预测密钥中。例如,如果媒体文件位于“s3://my-media-bucket/foobar/media-001.mp3”,您可以制作一个包含当前步骤的任务令牌的 JSON 文件,并使用相同的密钥存储它在不同的存储桶中,例如“s3://ffprobe-tasks/foobar/media-001.mp3.json”。在将媒体发送到 transloadit 的步骤结束时,不要在步骤上调用成功或失败 - 让它继续运行。

      然后当你收到s3通知transloadit结果准备好时,你可以确定s3 key来获取任务令牌('s3://ffprobe-tasks/foobar/media-001.json'),加载JSON (并将其从 s3 中删除)并发送该任务的成功。 step 函数在执行中会继续到下一个状态。

      【讨论】:

        【解决方案3】:

        您通常希望将异步任务作为 Step Function 活动启动。这里的关键字是initiate - 换句话说,一旦你的活动有一个待处理的动作,那就是你触发你的异步动作的时候。这样做的原因是您需要与待处理活动关联的任务令牌 - 然后只要您的“未来”可以以某种方式包含此令牌(例如,您可以将其设置为参考或请求 ID),那么您就可以“完成”使用SendTaskSuccessSendTaskFailure 调用成功或失败的活动。

        启动任务有两种方法:

        1. 轮询新活动。您将设置 CloudWatch 计划事件,以每 n 分钟拨打一次GetActivityTask

        2. 在 step 函数中与您的活动并行触发一个新的“启动器”任务。这个发起者执行与#1 相同的调用,并进行GetActivityTask 调用,唯一的区别是它立即触发并且不需要轮询机制。 GetActivityTask 调用会阻塞,直到有新的活动任务可用,因此不存在竞争条件问题。请注意,您可能会从另一个执行中获取活动,因此该发起者只需要考虑活动的输入,而不是发起者本身接收的输入。

        这是步骤函数中 #2 的样子:

        以及与 InitiateManualApprovalActivity 任务相关的基本代码示例:

        import boto3
        import time
        
        client = boto3.client('stepfunctions')
        activity = "arn:aws:states:us-east-1:123456789012:activity:ManualStep"
        
        def lambda_handler(event, context):
            print(event)
            # This will block until an activity task becomes available
            task = client.get_activity_task(activityArn=activity, workerName="test")
            print(task)
            # Perform your task here
            # In this example we continue on in the same function,
            # but the continuation could be a separate event, 
            # just as long as you can retrieve the task token
            time.sleep(60)
            response = client.send_task_success(taskToken=task['taskToken'], output=task['input'])
            print(response)
            return "done"
        

        【讨论】:

          【解决方案4】:

          无法提出简单的解决方案,只能探索几个方向。

          首先,Step Functions 有一种特定的方式来处理长时间运行的后台工作:活动。 https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html 基本上就是一个队列。

          如果您想要 100% 无服务器,这将变得复杂或丑陋。

          • 如您所说,为每个文件创建新的步进函数
          • 或者,状态机中的 S3 轮询循环使用自定义错误代码和 Retry 子句

          如果您可以为后台工作人员分配“1/8 微”实例,它并不优雅但很容易,并且可以通过即时反应来实现。低硬件要求暗示我们将只使用机器进行同步。

          定义 StepFunction 活动,例如命名为 video-duration。 定义 SQS 队列以进行即时反应或轮询 S3 以获取持续时间结果。

          状态函数伪代码:

          {
            StartAt: ffprobe
            ffprobe: {
              Type: Task
              Resource: arn:...lambda:launch-ffprobe
              Next: wait-duration
            }
            wait-duration: {
              Type: Task
              Resource: arn...activity:video-duration
              End: true
            }
          }
          

          后台工作人员伪代码:

          statemap = dict/map filename to result
          
          thread1:
            loop:
              taskToken, input = SF.GetActivityTask('video-duration')  # long poll
              sync(key=input.filename, waiter=taskToken)
          thread2:
            loop:
              msg = SQS.ReceiveMessage(...)  # or poll S3
              sync(key=msg.filename, duration=msg.result)
          
          function sync(key, waiter, duration):
            state = statemap[key]
            if waiter:
              state.waiter = waiter
            if duration:
              state.duration = duration
            if state.waiter and state.duration:
              SF.SendTaskSuccess(state.waiter, state.duration)
          

          S3 触发伪代码:

          if filename is video:
            SF.StartExecution(...)
          else if filename is duration:
            content = S3.GetObject(filename)
            SQS.SendMessage(queue, content)
          

          【讨论】:

            【解决方案5】:

            好吧,我会从https://aws.amazon.com/blogs/compute/implementing-serverless-manual-approval-steps-in-aws-step-functions-and-amazon-api-gateway/ 中激发自己

            您可以将其中的 API Gateway 替换为 AWS Lambda 函数,例如由 S3 事件触发(文档:http://docs.aws.amazon.com/lambda/latest/dg/with-s3.html)。只需确保您的任务有适当的超时时间。

            【讨论】:

              【解决方案6】:

              当我尝试结合 SFN 来编排 AWS Batch 作业时,我也遇到了这个问题。 上面建议的做法是有问题的,因为你应该传递 taskToken,所以你需要从状态机中的 lambda inside 从队列中轮询 TaskToken,并将其传递给 S3 或其他地方,即另一个 lambda 将提交活动状态。

              问题是:当您轮询 taskToken 时,您无法知道它是否属于您的状态机实例。 您可以改为在同一状态机的另一个实例上获取令牌。 就个人而言,我认为如果 AWS 能够支持这个功能,那就太好了,他们可以轻松做到......

              【讨论】:

                【解决方案7】:

                如果您知道 transloadit 完成后会将文件放在 S3 中的哪个位置,您可以循环轮询 S3。要进行投票,您可以使用HeadObject,然后检查响应的状态代码。

                AWS Step Function documentation 中的一个示例项目中描述了这种轮询循环。您可以按照here 的描述直接请求 S3 API,而不是使用您必须为执行付费的 Lambda。如果没有 Lambda,您只需 pay for state transitions in the Standard Workflow

                【讨论】:

                  猜你喜欢
                  • 1970-01-01
                  • 1970-01-01
                  • 2022-01-17
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  • 2022-01-17
                  • 1970-01-01
                  • 1970-01-01
                  相关资源
                  最近更新 更多