【问题标题】:DocumentDB updating multiple documents failsDocumentDB 更新多个文档失败
【发布时间】:2016-03-15 11:25:23
【问题描述】:

我已经编写了一个存储过程,用于将 Type 属性添加到 DocumentDB 集合中的所有文档。不幸的是,仅更新一个文档后,存储过程就失败了。该集合包含大约 5000 个文档。

这是存储过程:

function updateSproc() {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
var response = getContext().getResponse();
var responseBody = {
    updated: 0,
    continuation: true,
    error: "",
    log: ""
};

// Validate input.
tryQueryAndUpdate();

// Recursively queries for a document by id w/ support for continuation tokens.
// Calls tryUpdate(document) as soon as the query returns a document.
function tryQueryAndUpdate(continuation) {
    var query = { query: "SELECT * FROM root c WHERE NOT is_defined(c.Type)", parameters: []};
    var requestOptions = { continuation: continuation};

    var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function(err, documents, responseOptions) {
        if (err) {
    responseBody.error = err;
    throw err;
        }

        if (documents.length > 0) {
            // If documents are found, update them.
            responseBody.log += "Found documents: " + documents.length;
            tryUpdate(documents);
        } else if (responseOptions.continuation) {
            responseBody.log += "Continue query";
            tryQueryAndUpdate(responseOptions.continuation);
        } else {
            responseBody.log += "No more documents";
            responseBody.continuation = false;
            response.setBody(responseBody);
        }

    });

    // If we hit execution bounds - throw an exception.
    if (!isAccepted) {
        responseBody.log += "Query not accepted";
        response.setBody(responseBody);
    }
}

// Updates the supplied document according to the update object passed in to the sproc.
function tryUpdate(documents)
{
    if (documents.length > 0) {
        responseBody.log += "Updating documents " + documents.length;

        var document = documents[0];

        // DocumentDB supports optimistic concurrency control via HTTP ETag.
        var requestOptions = { etag: document._etag};

        document.Type="Type value";

        // Update the document.
        var isAccepted = collection.replaceDocument(document._self, document, requestOptions, function(err, updatedDocument, responseOptions) {
           if (err) {
              responseBody.error = err;
              throw err;
           }

           responseBody.updated++;
           documents.shift();
           tryUpdate(documents);
        });

        // If we hit execution bounds - throw an exception.
        if (!isAccepted) {
            responseBody.log += "Update not accepted";
            response.setBody(responseBody);
        }
    } else {
        tryQueryAndUpdate();
    }
}}

根据返回的响应,我可以看到查询返回了 100 个文档。 tryUpdate 被调用了两次,但第二次调用 replaceDocument 不被接受。为什么要更新的文档很多时不接受?

【问题讨论】:

  • 我花了大约 20 分钟来查看这个,但我没有看到任何明显的东西。我看到的唯一一件小事不会引起你的问题。即使 isAccepted 为 false,您仍会调用 tryUpdate(),这意味着您可能会在 DocumentDB 告诉您它不再接受操作之后尝试调用 replaceDocument()。我可以尝试的唯一另一件事是将文档放在顶级函数范围内,然后将查询的回调更改为 results 而不是 documents 并在错误检查后添加 documents = results
  • 哦,看起来您的代码将永远不会看到调用 tryQueryAndUpdate 并继续的块,但这意味着您无法获得第二页,它不会导致您的第二个 replaceDocuments()打电话失败。我会在上面多做一些,也许我会亲自体验一下。
  • 哦,如果您将文档移动到顶级范围,请记住不要将文档传递到 tryUpdate。
  • 在 Azure 门户中运行查询“SELECT * FROM root c WHERE NOT is_defined(c.Type)”时,它使用了 700 多个 RU。这可能是第二次替换失败的原因吗?
  • 可能,尤其是在 S1(250 RU/秒)集合上运行。您在 c.Type 上有 -1 精度索引吗?我猜如果你这样做,它可以使用索引来完成查询。我已更改为不使用 .Type 字段,而是对于每种类型,我都有一个 Is=true (如果没有,则缺失)。然后我使用SELECT * FROM root c WHERE c.IsMyType。即使使用默认索引,这似乎也非常有效,但在您的情况下可能会更难,除非您的类型列表非常小。如果比较小,可以这样做:SELECT * FROM root c WHERE c.IsMyType1 OR c.IsMyType2...

标签: stored-procedures insert-update azure-cosmosdb


【解决方案1】:

根据我对同一问题的回答MSDN

是的,每个插入 700RUs +(估计)20RUs,在每秒只允许 250RUs 的集合上将是一个问题。查询是 700RU,因为您正在执行 NOT 操作,这实际上是一次扫描,因为它无法被索引。

所以有些事情可以尝试;

1) 更改逻辑以排除 NOT is_defined 检查,并可能按 _ts DESC 排序以首先获取最后更新的文档。这可能比做 NOT 检查便宜。然后你可以检查你得到的每个文档是否已经有一个 Type 属性,如果没有,添加一个并 ReplaceDocument

2) 您也可以在执行此操作时尝试将集合扩展到 S3,然后再次将其缩小到 S1。这将为您提供 2500 RU 供您使用。

3) 即使使用 S3,您仍然可能会遇到这种情况,它可能只是在比第二个文档多的文档之后发生。

因此,要解决此问题,我将在应用程序中执行查询以仅返回未定义属性的记录的 ID,

SELECT VALUE c.id FROM c WHERE NOT is_defined(c.Type)

将这些 id 粘贴到某种列表/数组中,然后从列表中 .Take() 项并作为数组传递给 sproc。现在让 sproc 循环遍历传递的数组,按 id 执行 ReadDocument,更新和替换以及递增计数器。

当 isAccepted 返回 false 时,将响应正文设置为计数器的值并返回调用代码。现在调用代码可以然后 Skip(counter).Take(x) 并再次调用 sproc。

查看this sample 的示例,了解如何通过存储过程进行批量插入。这显示了如何批处理记录、执行 sproc,并在 isAccepted == false 从响应正文之前获取 sproc 在该批处理中的当前位置。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-02-25
    • 1970-01-01
    • 1970-01-01
    • 2013-08-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-24
    相关资源
    最近更新 更多