【问题标题】:Most efficient way to query and update dynamoDb查询和更新 dynamoDb 的最有效方法
【发布时间】:2020-09-10 10:38:08
【问题描述】:

我有一个 dynamo DB 表,将用于存储失败的请求,稍后另一个 lambda 将要读取请求并重新处理它们。

此刻我正在使用 typescript CDK 创建这样的表格

const myTable = new dynamodb.Table(this, "my-table", {
      tableName: "my-table-name",
      partitionKey: { name: "file_id", type: dynamodb.AttributeType.STRING },
    });

我在 python lambda 中像这样将数据发送到表中

dynamodb = boto3.resource("dynamodb", region_name=region)
my_table = dynamodb.Table("my-table-name")

failedRecord = {
        "file_id": str(file_id),
        "processed": "false",
        "payload": str(payload),
    }

    my_table.put_item(Item=failedRecord)

现在我想从另一个 lambda 中对表中的所有条目进行处理 = false 我想读取它们,对它们做一些事情,然后更新它们的处理 = true。

是否需要在此处添加二级索引才能提高效率。一个如何做到这一点的例子会很棒。

谢谢

【问题讨论】:

    标签: python amazon-web-services nosql amazon-dynamodb


    【解决方案1】:

    考虑创建一个仅包含 个未处理项的全局二级索引。您可以通过添加/删除 GSI 主键在 GSI 中添加/删除项目。例如,考虑下面的表结构:

    请注意,只有 file_id 3 和 4 定义了 GSIPK。 GSI 在逻辑上应该如下所示:

    DynamoDB 只会将项目投影到该项目上存在 GSIPK 的索引中。您的 lambda 可以从 GSI 中读取,做一些工作,将 processed 属性设置为 true 并删除 GSIPK 值。这将有效地从二级索引中删除该项目。

    调用 DynamoDB 执行此操作的 update 如下所示:

     const params = {
        TableName: YOUR_TABLE_NAME_HERE,
        Key: {
          PK: FILE_ID_HERE
        },
        UpdateExpression: "SET #processed = :true REMOVE #gsipk",
        ExpressionAttributeNames: {
          "#processed": "processed",
          "#gsi1pk": "GSIPK",
        },
        ExpressionAttributeValues: {
          ":true": true
        }
      };
    
      ddbClient.update(params);
    

    【讨论】:

    • 这看起来像是我现在要尝试的一个有前途的解决方案,请问您是否认为这会比设置为全局二级索引并查询查找错误值然后更新它们更好的解决方案处理后为真?
    • 当然有很多方法可以解决这种访问模式。对已处理的索引也可以工作。我建议试一试,看看它如何适用于您的用例
    【解决方案2】:

    假设您的 filenote_id 已经是唯一的(应该假设您已将其设置为分区键),您共享的记录格式和表模式是 GSI,而不添加排序键不会有任何区别.

    您可以考虑的另一种方法是为相关表启用DynamoDB Stream 并将其设置为trigger of the second Lambda Function。 使用这种方法,您基本上可以捕获桌面上的所有活动,并且在您的逻辑中,您可以过滤掉所有不是 INSERT 的事件,并按照自己的节奏处理您感兴趣的事件。

    这样您就可以完全避免查询表。

    【讨论】:

    • 我认为流在这里不会起作用,我们希望按计划触发第二个 lambda,或者在旧服务再次可用时触发。如果我们一进去就触发 lambda,它们很可能会再次失败。
    • 我明白了。您是否有任何理由要使用 DynamoDB?例如,为什么不将这些失败的请求放入 SQS 队列中,并让第二个 lambda 函数在必要时轮询此队列,并仅将最终结果存储在 DynamoDB 中?
    • 您可以将实际的请求负载作为 JSON 存储在 S3 上,并仅将对象键放入 SQS 中的消息中。如果它们那么大,为了从 DynamoDB 中查询它们,您必须进行第一次查询以仅检索它们的 ID(基本上从选定属性中排除有效负载),然后使用 get_item 逐个处理它们。这样做的原因是 DynamoDB 的每个查询的大小上限为 4MB。
    • This is the pattern I was referring to,我知道这里的用例在 Java 中有详细说明,但您可以很容易地在 Python 中实现。
    • 我们使用 dynamo 而不是 SQS 的原因是因为我们不希望有来自队列的重复请求,如果 file_id=A144 的新失败请求来到 dynamo,我们可以更新数据库中的条目,而不是添加到 SQS 队列中,在该队列中,我们可能有多个请求来更新同一个文件注释,如果有意义的话,只有队列中的最后一个是相关的?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-10-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多