【问题标题】:Lambda scaling with SQS trigger使用 SQS 触发器进行 Lambda 扩展
【发布时间】:2021-06-16 13:47:03
【问题描述】:

我已经为我的 Lambda 定义了一个 SQS 触发器。在 Lambda 内部,我正在调用基于令牌(每分钟 250 个令牌)的第 3 方 api。最初我定义了250 的批处理大小,65s 的批处理窗口,但发生的事情是 lambda 工作concurrently 来处理请求并且令牌很快耗尽。

然后在更改批处理大小、窗口和并发的各种值之后,最终该过程开始顺利运行,批处理大小为 10,批处理窗口为 10,保留并发为 7,但当时队列中只有 3,00,000 个产品 ID。昨天,当我将 400 万个产品 ID 推送到队列时,代币再次开始很快耗尽。当我检查日志时,我发现 Lambda 会以不同的时间间隔选择不同数量的消息,例如有时一分钟需要 200 条消息,有时需要 400 条消息。这个数字每次都不同。

我想要的是,无论队列中有多少消息,Lambda 都应该在 1 分钟内从队列中挑选出 250 条消息。如何做到这一点?

【问题讨论】:

  • 批量大小为 10 时怎么会收到 200 或 400 条消息?
  • 是的,这就是问题所在。我已经上传了日志。你可以检查一下。
  • @Marcin 那些 200 和 400 是每分钟,而不是批量大小。如果您将批量大小和窗口都设置为 10,那么这些值似乎没问题。
  • 我可以计算的最大值是 350 条消息 = 7 并发 * 10 条批量消息 * 5 线程。但如果你的函数运行时间少于 1 分钟,那么你可以轻松超过 400 分钟。但无论如何,答案是你无法完全控制它。它取决于 AWS 如何汇集和扩展汇集。
  • 是的@Maurice。同样使用这种方法,lambda 选择的消息数量每次都不同。

标签: amazon-web-services aws-lambda amazon-sqs aws-serverless


【解决方案1】:

长话短说,您无法完全控制 lambda 池 SQS 的方式。 AWS 代表在SQS Lambda Trigger Polling Issue 中明确说明了这一点:

由于这完全依赖于 Lambda 服务,所以轮询机制无法控制

更重要的是,lambda 使用five pooling threads

Lambda 服务将开始使用 五个并行 长轮询连接轮询 SQS 队列。

因此,通过您的设置,您可以在一分钟内轻松获得多个池(取决于函数持续多长时间):

7 concurrency * 10 messages in batch * 5 threads * 6 pools per minute = 2100 per minute

作为AWS rep writes,解决此问题的唯一方法是不要直接使用 SQS 和 lambda:

缓解这种情况的唯一方法是在 Lambda 函数上禁用 SQS 触发器

【讨论】:

  • 7 并发 * 10 条消息批量 * 5 个线程 * 每分钟 6 个池 = 每分钟 2100 个池 在此什么是每分钟 6 个池?
  • @nats 它来自批处理窗口。你把它设置为 10 秒,所以你可以在一分钟内得到 6 个批次
【解决方案2】:

我认为 SQS 不是解决此类问题的合适产品。您正在寻找的是节流,而 SQS 可能不是解决此问题的正确工具。

例如。您将批量大小设置为 10,将窗口设置为 10。这并不意味着您认为它意味着什么。

您告诉 SQS 在最多 10 秒内最多批处理 10 个项目。但是如果 SQS 1 秒后有 10 个项目,它会触发你的 Lambda。

查看您的要求,您似乎将更多的数据放入队列中,而不是您可以从中读取的数据。

考虑到这一点,我建议您先将数据写入 DynamoDB,然后执行由 EventBridge 触发的作业,该作业每分钟运行一次,并从 DynamoDB 中准确提取 250 个项目(或您拥有的任何数量的令牌)并执行工作。

总结:

  1. 将您的项目放入 SQS
  2. 从 SQS 触发 Lambda A
  3. Lambda A 会将其写入 DynamoDB
  4. 创建 EventBridge 规则以每 60 秒触发一次 Lambda B
  5. Lambda B 从 DynamoDB 读取 n 项并处理它们

【讨论】:

  • 所以我正在做的是将产品 ID 从 RDS 获取到 SQS,然后是这个 lambda。
  • 如果您的数据已经在 RDS 中,您可能可以只使用 Lambda B 并尽可能直接从 RDS 读取。让事情变得更容易。
  • 是的,它会让事情变得更容易,但这种方法的唯一问题是我将如何更新每次调用中的限制和偏移量,以便它选择下一组记录。即使在第一次调用结束时,我仍然会在第二次触发时更新限制和偏移量,它将再次从 0 开始。
  • @nats 是的。那时可能有不同的挑战需要解决。一种选择可能是创建该表的临时副本并将其用作您的工作的源。然后,当 Lambda 处理 250 个项目时,它会从该临时表中删除它们。重复此过程,直到没有剩余项目,然后可以删除该表。这可能是解决这个问题的务实方法。但是对于我们在 SO 的我们来说,在信息有限的情况下很难找到合适的解决方案来解决您的问题。我想至少你现在知道你需要一些不同的东西;)
  • 是的,现在我知道该怎么做了。非常感谢。
猜你喜欢
  • 2019-06-02
  • 2021-12-01
  • 2020-05-17
  • 2020-07-28
  • 1970-01-01
  • 2021-03-28
  • 2021-11-28
  • 2019-03-26
  • 1970-01-01
相关资源
最近更新 更多