【问题标题】:CosmosDB - Mongodb IsUpsert not working for bulk updatesCosmosDB - Mongodb IsUpsert 不适用于批量更新
【发布时间】:2020-11-30 17:12:14
【问题描述】:

过去几个月,我们通过 .NET Core 和最新的 MongoDB.Driver Nuget 包 (2.11.0) 广泛使用 MongoDB API 和 CosmosDB (Server v3.6)。

批量插入和插入工作正常,但不幸的是,我无法使用 IsUpsert=true 模式进行批量操作。

注意:

  • 我们使用Polly 来管理速率限制。作为其中的一部分,我们处理 MongoWriteException, MongoExecutionTimeoutException, MongoCommandExceptionMongoBulkWriteExceptions
  • 分片/非分片集合都会出现此问题。

具体来说,给定一个非分片输入文档列表List<T> documents,以下工作正常:

  1. 批量插入:

    await Collection.BulkWriteAsync(documents.Select(s => new InsertOneModel<T>(s)),...)
    
  2. 批量更新:

    await Collection.BulkWriteAsync(documents.Select(s =>
      new ReplaceOneModel<T>(Builders<T>.Filter.Eq("Id", item.Id), item) { IsUpsert = false }),...)
    

不幸的是,如果某些文档是新文档,我们应该能够按原样使用上面的批量更新代码 - 但只需将 IsUpsert 标志设置为 true...但是,可惜,这不起作用。

具体来说,给定 50 个现有文档和 50 个新文档

  • 如果文档的 Id 类型为 ObjectId 作为主键,对于它处理的第一个新文档,CosmosDb 将使用 Id=ObjectId("000000000000000000000000") 错误地插入它 - 此时将不会插入/更新更多文档。在这种情况下:
    • BulkWriteResult 返回 MatchedCount=65, ModifiedCount=65, ProcessedRequests=100, RequestCount=100, Upserts=1, IsAcknowledged=true, IsModifiedCountAvailable=true, InsertedCount=0
    • 没有抛出异常。
    • 注意-数据库中只有51个文档,所以不能依赖BulkWriteResult
  • 如果文档的 Id 类型为 int 作为主键,那么 cosmos db 似乎
    • 在某个随机点放弃处理文档。这似乎更像是一种速率限制类型的场景...除非没有抛出异常
    • 例如,更新所有 50 个文档,但只插入了 8 个。在这种情况下,BulkWriteResult 返回 MatchedCount=50, ModifiedCount=50, ProcessedRequests=100, RequestCount=100, Upserts=8, IsAcknowledged=true, IsModifiedCountAvailable=true, InsertedCount=0

我错过了什么? ObjectId 场景似乎完全崩溃了;其他场景可以编码,但这里没有引发异常似乎不正确。

【问题讨论】:

    标签: mongodb azure azure-cosmosdb


    【解决方案1】:

    对于其他被此问题困扰的人 - 解决方法远非简单,但这是我最终要做的。

    • 主键为 ObjectId 的文档:只要您根据标识符是否为ObjectId.Empty。但是,您可能仍然需要处理下面提到的一个错误
    • 主键不是ObjectId 的文档:绝对是CosmosDb 中的一个错误,因为我无法在官方MongoDB 实现中重现这种情况。为了解决这个问题,我必须应用以下两种解决方法:
      • 抛出一个自定义异常并更新我现有的Polly 策略以重试未处理的请求,就像我通常如何处理通常由CosmosDB 抛出的其他MongoDB 速率限制异常一样。示例代码:
      BulkWriteResult<T> bulkWriteResult = await Collection
       .BulkWriteAsync(
           remainingWork,
           new BulkWriteOptions { BypassDocumentValidation = true },
           token);
      
      var actuallyProcessed = bulkWriteResult.DeletedCount + bulkWriteResult.InsertedCount +
                           bulkWriteResult.ModifiedCount + bulkWriteResult.Upserts?.Count;
      if (actuallyProcessed < bulkWriteResult.ProcessedRequests.Count)
      {
          // Off by one error: OCCASIONALLY, the last one processed is not actually processed
          // No way to detect this, unfortunately - hence the adjustment by 1
          actuallyProcessed = actuallyProcessed > 1 ? actuallyProcessed - 1 : 0;
          var processed = bulkWriteResult.ProcessedRequests.Take((int)actuallyProcessed)
              .ToList().AsReadOnly();
          var unprocessed = bulkWriteResult.ProcessedRequests.Skip((int)actuallyProcessed)
              .ToList().AsReadOnly();
          throw new CosmosDbRateLimitingBugException<T>(unprocessed, processed, bulkWriteResult);
      }
      
    • 关闭一个错误处理。不确定在纯 MongoDB 实现中是否需要这样做,但就像上面一样,您有时还必须将已处理的记录调整 1。注意:无论使用 'IsUpsert=true',此问题都适用。下面的代码略有简化,因为我使用Polly.Context 来跟踪异常和已处理/未处理的记录(未显示)。这里remainingWork 是必须向下一个BulkWriteAsync&lt;&gt; 调用发出的WriteModel&lt;T&gt; 请求。
    if (exception is MongoBulkWriteException<T> mostRecentException)
    {
        var unProcessedRequests =
            mostRecentException.UnprocessedRequests.ToList();
        if (mostRecentException.WriteErrors.Any())
        {
            //get processed requests (without success) that failed and add to remainingWork
            var requestWithError = new[]
                {
                    mostRecentException.Result.ProcessedRequests[
                        mostRecentException.WriteErrors[0].Index]
                };
            unProcessedRequests = unProcessedRequests.Concat(requestWithError).ToList();
        }
    
        remainingWork = unProcessedRequests.ToList();
    }
    else if (exception is CosmosDbRateLimitingBugException<T> cosmosDbBug)
    {
        remainingWork = cosmosDbBug.UnprocessedRequests;
    }
    

    【讨论】:

    • 仅供参考 - 尽管有上述解决方法,但 isUpsert=true 在 Azure 中不适用于非 ObjectIds 方案。
    • 感谢您的提问/回答,您为我节省了一些时间。很郁闷....
    猜你喜欢
    • 1970-01-01
    • 2018-05-26
    • 2022-10-13
    • 1970-01-01
    • 2014-08-24
    • 2017-03-19
    • 2011-05-25
    • 2013-06-18
    • 1970-01-01
    相关资源
    最近更新 更多