【问题标题】:Multiple worker to same queue messages多个工作人员到同一队列消息
【发布时间】:2022-02-06 17:12:51
【问题描述】:

我的任务是尽快将密钥(S3 对象)从一个 S3 帐户迁移到另一个 S3 帐户。我想在单个队列中发布密钥,但希望多个工作人员(在不同的 EC2 实例上运行)以并行方式使用来自同一队列的密钥。

假设我在队列中连续发布 1000 个密钥,并且有 5 个工作人员正在使用队列中的密钥。我希望每个不同的工作人员和所有进程键并行选择每 1000 个。

我不知道该怎么做,并区分哪个工人已经挑选了钥匙,哪个工人还没有挑选。

【问题讨论】:

  • 您是否希望使用 AWS Lambda 函数或在某处(例如在 Amazon EC2 实例或您自己的计算机上)运行的“工作人员”来处理消息?
  • 这些工作线程在不同的 EC2 实例上运行@JohnRotenstein

标签: node.js amazon-web-services amazon-s3 amazon-sqs


【解决方案1】:

你应该创建:

  • Amazon SQS 队列
  • 一个 AWS Lambda 函数,配置为在将消息发送到 SQS 队列时触发,并将处理消息中提到的键
  • 将密钥作为消息“推送”到 SQS 队列中的东西

当消息可用时,Amazon SQS 队列将自动触发 AWS Lambda 函数。它最多可以向每个被调用的 Lambda 函数传递 10 条消息,默认情况下最多可以同时运行 1000 个 Lambda 函数。

每个“消息”可以包含一个键或多个键 - 由您决定。 Lambda 函数只需要知道如何处理您发送的消息。

Lambda 函数将接收event['Records'] 列表(数组)中的消息。它应该处理这些消息,然后退出函数。这将导致这些消息从队列中删除。如果函数没有成功退出(例如产生错误),消息会自动重新出现在队列中进行重新处理。

【讨论】:

    【解决方案2】:

    如果您在 Amazon EC2 实例上运行工作程序,则每个工作程序都应执行以下操作:

    • 永远循环,执行以下步骤:
      • 使用 WaitTimeSeconds = 20 调用 receiveMessage() -- 这将等待最多 20 秒,然后返回空响应,从而减少工作人员调用队列的次数
      • 循环遍历返回的Messages数组
      • 对于数组中的每条消息,处理消息然后调用deleteMessage(),传递消息的ReceiptHandle

    每个工作人员每次ReceiveMessage() 调用最多可以请求 10 条消息。

    在消息中添加什么内容由您决定——它可能是单个键,也可能是多个键。您应该对您的工作人员进行编码以了解如何解释消息。

    因此,如果您要求每个工作人员获得 1000 个密钥,则您应该在一条消息中发送 1000 个密钥,并将每个工作人员配置为在调用 receiveMessage() 时使用 MaxNumberOfMessages = 1。但是,我建议您使用较小的数量,以便在工人之间更均匀地分配工作。

    【讨论】:

      猜你喜欢
      • 2014-06-05
      • 1970-01-01
      • 1970-01-01
      • 2014-08-23
      • 1970-01-01
      • 2021-04-17
      • 2016-11-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多