【问题标题】:Amazon SQS unique messageAmazon SQS 唯一消息
【发布时间】:2016-03-05 17:06:14
【问题描述】:

我使用 SQS 作为视频编码队列,并希望确保每个视频只执行一次编码。

SQS 工作正常,因为当消息排队时,它只会被单个线程接收。但是,对于相同的视频/编码,可能会向队列发送多条消息,这意味着特定“编码”队列的消息内容是相同的。

是否有去重以确保对于特定队列,队列中的消息或从队列接收的消息是唯一的?

我认为一个选项是在发送消息时为每种编码类型创建一个新队列。所以队列可以命名为encoding-video-id,它只有一条消息,我可以检查以确保队列不存在。唯一的“问题”是创建的数千个这样的队列可能有 1000 到 10 个。

【问题讨论】:

  • 那么是什么导致您将同一条消息多次排队?
  • 用例是用户可以提交'encode'来排队视频,在极端情况下可能会被多次点击,这将导致多条消息。
  • 刚刚注意到您可以在 sqs 中创建“无限”队列,因此上述选项可能会起作用。
  • 即使没有用户排队重复任务的可能性,SQS 本身也不能保证“恰好一次”传递消息。它保证“至少一次”,因此 SQS 本身可以传递重复的消息。我认为这些问题的答案与您的问题有关:stackoverflow.com/questions/32386877/…stackoverflow.com/questions/13484845/…
  • @mbaird 我认为这将成为需要做的事情。基本上在redis中使用原子操作并在其上设置较低的TTL(在处理时更新)。可以简单地使用基于视频 guid 的具有唯一键的 INCR 并检查它是否存在。如果这上面的 TTL 是 20 秒,SQS 上的 TTL 是 1m,两者都在每 10 秒处理一次作业时更新,我认为这应该可以解决重复数据删除的问题并允许重试 SQS。

标签: amazon-web-services amazon-sqs


【解决方案1】:

SQS Has a Deduplication ID Property。在 5 分钟窗口内发送的具有相同重复数据删除 ID 的消息将被成功接收,但实际上并未添加到队列中。

您可以使用它来防止对同一视频进行额外排队。

这会增加一些复杂性,即使消息已处理,具有相同重复数据删除 ID 的其他消息也不会排队,直到窗口结束。同样,如果您在窗口结束后发送相同的 ID,则消息将再次排队,这也可能是不希望的。

但是,重复数据删除 ID 应该授予您请求的行为,而不是维护您自己的排队视频缓冲区。

【讨论】:

    【解决方案2】:

    有一种方法可以在从队列接收数据后仅检查唯一消息。我将在下面解释。

    假设您经常向单个 SQS 队列添加随机消息(不考虑任何 id 或任何内容)。逻辑是在从队列接收消息的时候。

    在创建 ReceiveMessageRequest 对象时,您可以指定 AttributeNames。因此,将“ApproximateReceiveCount”属性添加到请求对象。这将获取“Ap​​proximateReceiveCount”值以及从 SQS 队列中获取的每条消息。

    现在,对于第一次读取的消息,“ApproximateReceiveCount”为 1。否则该值将大于 1。因此,您可以在每次执行 SQS 读取时只考虑这些消息。只需通过设置请求对象的“MaxNumberOfMessages”属性来限制每次读取的最大消息数,以确保每次读取不会获得巨大的有效负载(有效负载的每个 64 KB 块计费为 1 个请求) .

    我知道,FIFO 队列在某些情况下会做得更好。但是,它几乎没有限制-

    • 吞吐量有限(每秒只有 300 个事务 (TPS))
    • 目前它仅支持两个区域(美国西部(俄勒冈)和美国东部(俄亥俄)区域)

    请在下面找到解释逻辑的 C# 代码-

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using Amazon.SQS;
    using Amazon.SQS.Model;
    
    namespace DriverDataPooler1
    {
        class Program
        {
            AmazonSQSClient objClient = new AmazonSQSClient
                    ("<AWSAccessKeyId>", "<AWSSecretAccessKey>", Amazon.RegionEndpoint.APSouth1);
            //Create New SQS Queue
            CreateQueueResponse queueResponse = new CreateQueueResponse();
            ListQueuesResponse objqueuesResponseList = new ListQueuesResponse();
    
            // Declare the request and response objects
            ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
            ReceiveMessageResponse receiveMessageResponse = new ReceiveMessageResponse();
    
            static void Main(string[] args)
            {
                Program p1 = new Program();
                p1.getQueueData();
            }
    
            public void getQueueData(){
    
                objqueuesResponseList = objClient.ListQueues(new ListQueuesRequest());
                List<String> QueueList = objqueuesResponseList.QueueUrls;
    
    
    
                // Receive Message from SQS Queue
                if (QueueList.Any())
                {
                    // I am only considering the first queue here as I have only one SQS queue
                    receiveMessageRequest.QueueUrl = QueueList[0];
                    receiveMessageRequest.WaitTimeSeconds = 20;
    
                    //You can limit t6he number of messages to decrease the mayload amount (depends on the size of each message) 
                    receiveMessageRequest.MaxNumberOfMessages = 10;
                    receiveMessageRequest.AttributeNames = new List<string>() { "ApproximateReceiveCount" };
                    receiveMessageResponse = objClient.ReceiveMessage(receiveMessageRequest);
                    List<Message> result = receiveMessageResponse.Messages;
                    if (result.Any())
                    {
                        foreach (Message res in result)
                        {
                            // Checking for the messages that are read for the first time
                            if (Int16.Parse(res.Attributes["ApproximateReceiveCount"]) == 1)
    
                                // Process you messages here 
                                Console.WriteLine(res.Body);
                        }
                    }
                    else
                    {
                        Console.WriteLine("You have no new messages in your SQS");
                    }
                }
                else
                {
                    Console.WriteLine("You have no available SQS");
                }
                Console.ReadKey();
    
            }
        }
    }
    

    如果您有任何进一步的疑问,请发表评论。

    【讨论】:

      【解决方案3】:

      没有办法确保 SQS 队列中消息的唯一性或排序。此外,排队太多也不是一个好主意。

      在我看来,您需要在系统中添加另一个组件。某种元数据服务就足够了。它可以像这样工作:

      • 创建编码任务时(在将其添加到 SQS 之前),您可以将其写入元数据服务。
      • 当工作人员收到编码任务时,它会查询元数据服务以查看该任务是否已经完成
      • 当工作人员完成编码任务时,它会在元数据服务中将该任务标记为已完成

      如果您将这些编码作业的输出上传到 S3,您可以有效地将 S3 本身用作元数据服务。如果每个视频都有唯一的名称/ID,您可以使用此唯一 ID 的键将输出保存在 S3 中。或者将其设置为 S3 元数据键值(这会使文件更难找到,因为您不能只查询 S3 元数据服务)。然后,当worker收到编码任务时,它会检查该文件是否已经存在于S3上,在这种情况下,它会从SQS中删除消息并跳过该任务。

      如果您不将输出保存到 S3,您可能需要使用某种数据库。 Dynamo DB 在速度和成本方面可能会有所帮助。

      希望这会有所帮助! :)

      【讨论】:

      • 所以这绝对是我想到的,但这里有一个场景,由于各种原因,主动编码可能会失败(服务器故障、编程异常等)。 SQS 的好处是我们可以使用 Visibility Timeout 并在它被编码时更新它(对于长编码),但是如果任何无法处理的东西死掉了,这个消息现在可以被重新处理,因为它应该。但是,如果我们有另一个服务检查作业是否存在,这将阻止它被重新处理。
      • 一次选项可能是在文档上使用带有 TTL 的 MongoDB 并更新 TTL,就像更新可见性超时一样。但是,如果它超过了 SQS 的 VT,那么再次收到的消息将会丢失,因为我们会将那些重复的消息标记为已删除。
      • 当然,生存超时和更新是 SQS 对于这种批处理用例如此出色的原因。但是我并不完全了解您的第一个 cmets 您不会阻止任何东西被重新处理,而且您当然不需要整个“其他服务”来检查工作是否存在。您只需在收到任务后立即检查您的元服务。如果任务成功执行,您只会将任务标记为已完成。所以没有什么可以阻止失败的任务被重新处理
      • 嗯,我认为这不会解决防止处理多条消息的问题。如果我们只是检查任务是否已经完成,那么仍然有可能同时运行多个任务。
      • 哦我明白了,你想防止多个工人同时运行同一个任务。那么你肯定需要使用像 Dynamo 这样的东西。当每个工作人员收到一条消息时,他们会使用唯一标识符更新 dynamo,以说明该任务已在处理中。然后,当另一个工作人员出现并收到同一任务的消息时,它将使用唯一 ID 查询 dynamo 并查看它是否已被处理。把它想象成一个锁定机制。
      【解决方案4】:

      您建议的解决方案是一个糟糕的设计,无论是否可能。以下是我解决问题的方法。

      我将使用一个数据库(可能是 DynamoDB)来存储一个基于视频编码类型的唯一 ID,并且我将添加一个名为 status 的列。一旦用户单击转换按钮,首先,我将检查数据库。如果项目不可用,将向数据库推送一条状态为“正在转换”的新记录。然后将工作推入 SQS。处理完工作负载后,将数据库的状态更改为“已完成”。如果用户再次单击转换按钮,则根据数据库中的状态变量显示结果。

      【讨论】:

      • 如果作业失败并且 SQS 需要重试会发生什么。再次收到新消息时,dynamo 中的状态将为“正在转换”。
      • 它可以由队列工作者处理。队列工作者将从队列中获取任务并开始转换。如果出现问题,您可以处理异常。您的工作仍将在队列中,因为您尚未将其删除。因此队列工作者将再次尝试相同的工作,直到成功。工作成功后,您可以从队列中删除消息并更新数据库。但要注意队列的可见性超时,以免重复工作。
      • 好吧,这种情况下并不是所有的异常都可以处理。但是,我认为可以做的是,由于每条消息都有一个唯一的消息 id,无论您收到多少次。该消息 ID 可以与视频 guid 相关联。收到消息后,它会检查该视频 guid 是否存在,如果存在且消息 id 不匹配,则它是重复的……这可能有效。
      • 但是,由于同一条消息可能会被多次接收......也许它不是防弹的。
      • 我不知道你的转换过程是如何工作的,以及为什么你不能处理异常。我建议的是更正确的架构方式。即使您没有处理异常,除非您从队列中删除您的工作,否则只有一个项目会在那里,对吗?因此,您可以毫不费力地利用这一优势。
      【解决方案5】:

      IMO,创建无限数量的队列,每个队列中只有一条消息是一个非常糟糕的设计,即使理论上它会起作用。

      如果是我,我会尝试确保每个视频都有某种唯一标识符,即使用户“双击”进程按钮也是如此。

      我会设想一个系统,其中具有唯一名称(例如 guid)的视频被上传到 S3,一条消息被放入队列中,您的线程从队列中提取消息并进行编码,然后写入视频返回到不同的 S3 存储桶,但具有相同的基本名称。

      在处理任何视频之前,我会先检查“输出存储桶”,看看那里是否已经存在具有匹配名称的编码视频,如果是 - 我会跳过重新处理并删除消息。

      如果一切都在 EC2 本地磁盘上运行(并且您没有使用 S3),那么可以使用硬盘上的输入和输出目录来完成相同的操作(但这会假设多台机器没有执行处理。

      重要的是要记住,SQS 可能会传递相同的消息 - 即使用户只提交了一次。虽然很少发生,但无论您设置什么系统,您都需要确保是否/何时确实获得了偶尔的重复,它不会破坏任何东西。

      【讨论】:

      • 所以每个视频都有一个唯一的 guid,不幸的是我们不在 AWS 中,所以一些更理想的工作流程将无法工作。但即使使用唯一的 guid,检查是否存在已编码的视频也不起作用,因为在视频被编码之前它可能需要一些时间才能显示出来。理想情况下,有一种机制可以在原子庄园中说“这个视频指南是在队列中还是正在处理中”。我们当然可以使用其他服务或数据库,但它与 SQS 的耦合并不像我想要的那样紧密,可能会出现误报,这是我使用其他队列方法所经历的。
      猜你喜欢
      • 2014-11-10
      • 2015-09-09
      • 1970-01-01
      • 2012-06-09
      • 1970-01-01
      • 2010-12-12
      • 2013-09-05
      • 1970-01-01
      • 2023-03-23
      相关资源
      最近更新 更多