【问题标题】:AWS Step Functions SendSQSMessage: Dynamic MessageGroupIdAWS Step Functions SendSQSMessage:动态 MessageGroupId
【发布时间】:2021-06-10 19:29:59
【问题描述】:

我正在使用 AWS CDK 创建一个状态机,该状态机将消息发送到 fifo 队列并等待来自 lambda worker 的回调以继续执行。

我希望发送到 fifo 队列的消息具有分配给它们的动态 MessageGroupId,以便我可以控制处理消息的 lambda 工作人员的数量。我能想到的拥有动态 MessageGroupId 的唯一方法是使用 JsonPath 在 step 函数输入上引用一些参数,但是我没有遇到任何关于它的文档。我最初使用 JsonPath 动态传递 MessageGroupId 的测试失败了,只是传递了字符串“$.MessageGroupId”,有效地为每条消息提供了相同的消息组 id,从而为一个 lambda worker 提供了一个。

  1. 是否可以在从 step 函数发送消息时为 sqs 消息动态分配消息组 ID?
  2. 如果是,怎么做?

【问题讨论】:

  • 这里有同样的问题。尝试访问 MessageGroupId 字段中的 Context 对象 ($$) 和输入 ($) 对象将导致文字字符串。例如,将MessageGroupId 设置为$$.TaskToken 将导致字符串$$.TaskToken 而不是它的值。根据文档,您应该能够访问内在函数,因为 MessageGroupId 嵌套在 Parameters 字段中 - docs.aws.amazon.com/step-functions/latest/dg/… 但这也不起作用。

标签: amazon-web-services amazon-cloudformation aws-cdk aws-step-functions


【解决方案1】:

在 AWS Support 的帮助下,我设法通过使用 Context Object 或从初始输入中传递一个 ID 并使用 $ 引用它来做到这一点。

这是一个例子:

{
  "Comment": "Generate unique MessageGroupId",
  "StartAt": "Start",
  "States": {
    "Start": {
      "Type": "Task",
      "TimeoutSeconds": 60,
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "<YOUR_QUEUE_URL>",
        "MessageBody": {
          "Input.$": "$",
          "TaskToken.$": "$$.Task.Token"
        },
        "MessageGroupId.$": "$$.Execution.Id"
      },
      "ResultPath": "$",
      "End": true
    }
  }
}

我的问题是我试图像这样MessageGroupId

"MessageGroupId": "$$.Execution.Id"

我应该做的:

"MessageGroupId.$": "$$.Execution.Id"

附加.$ 将解析表达式"$$.Execution.Id",而不是直接输入字符串"$$.Execution.Id"

【讨论】:

  • 如果您使用的是 CDK,那么您需要提供 JsonPath.StringAt("$$.Execution.Id")。谢谢你的回答。
猜你喜欢
  • 2020-09-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-29
  • 1970-01-01
  • 2022-12-21
  • 2019-04-24
相关资源
最近更新 更多