【问题标题】:ElasticSearch Nest BulkAll halted after receiving failures that can not be retried from _bulkElasticSearch Nest BulkAll 在收到无法从 _bulk 重试的故障后停止
【发布时间】:2019-01-24 14:22:10
【问题描述】:

使用BulkAll() 批量插入我收到这个奇怪的错误

BulkAll halted after receiving failures that can not be retried from _bulk

但是,当我检查异常时,我仍然得到了成功的响应:

Successful low level call on POST: /cf-lblogs-2019.01.23/cloudflareloadbalancinglogelasticentity/_bulk?

我在这里做错了什么?下面是sn-p的代码:

var waitHandle = new CountdownEvent(1);

var bulk = _client.BulkAll(group.ToList(), a => a
                .Index(_index.Replace("*", string.Empty) + group.Key)
                .BackOffRetries(2)
                .BackOffTime("30s")
                .RefreshOnCompleted(true)
                .MaxDegreeOfParallelism(4)
                .Size(group.Count()));

bulk.Subscribe(new BulkAllObserver(
                onNext: response => _logger.LogInformation($"Indexed {response.Page * group.Count()} with {response.Retries} retries"),
                onError: HandleInsertError,
                onCompleted: () => waitHandle.Signal()
            ));

waitHandle.Wait();


private void HandleInsertError(Exception e)
    {
        var exceptionString = e.ToString(); 
        _logger.LogError(exceptionString);
    }

巢 6.4.2.

弹性 6.5.4.

【问题讨论】:

    标签: c# elasticsearch nest


    【解决方案1】:

    这意味着BulkAll observable 无法索引一个或多个因无法重试而失败的文档。

    默认情况下,无法索引的文档的重试谓词是当一个项目返回 429 的 HTTP 响应状态代码时,即尝试同时索引超过集群能够处理的文档。

    查看BulkAll() 设置会发现两件事:

    1. var bulk = _client.BulkAll(group.ToList(), a => a

      group.ToList() 将立即评估所有文档并将它们缓冲在内存中的List<T> 中。为提高效率,您通常希望在批量索引时惰性枚举大型集合。如果group 是可以传递给BulkAllIEnumerable<T>,那么就传递它。

    2. .Size(group.Count()));

      这将尝试在一个批量请求中发送所有文档BulkAll 的想法是它会同时发送多个批量请求,并一直这样做,直到所有文档都被索引。

      大小应该为每个请求设置一个合理的大小;您可以通过计算每个文档的平均字节数来计算合理的字节大小,然后从小于 5MB 的某个地方开始,或者您可能希望从每个请求 1000 个文档开始并评估索引速度是否足以满足您的需求或者如果您开始收到 429 个回复。当后者开始发生时,这很好地表明您已接近要索引的文档的集群的索引限制阈值。

    【讨论】:

      【解决方案2】:

      在我的情况下,我已经解决了如下:

              List<string> errors = new List<string>();
              int seenPages = 0;
              int requests = 0;
              CancellationTokenSource tokenSource = new CancellationTokenSource();
              ConcurrentBag<BulkResponse> bulkResponses = new ConcurrentBag<BulkResponse>();
              ConcurrentBag<BulkAllResponse> bulkAllResponses = new ConcurrentBag<BulkAllResponse>();
              ConcurrentBag<items> deadLetterQueue = new ConcurrentBag<items>();
              BulkAllObservable<items> observableBulk = elasticClient.BulkAll(lst, f => f
                      .MaxDegreeOfParallelism(Environment.ProcessorCount)
                      .BulkResponseCallback(r =>
                      {
                          bulkResponses.Add(r);
                          Interlocked.Increment(ref requests);
                      })
                      .ContinueAfterDroppedDocuments()
                      .DroppedDocumentCallback((r, o) =>
                      {
                          errors.Add(r.Error.Reason);
                          deadLetterQueue.Add(o);
                      })
                      .BackOffTime(TimeSpan.FromSeconds(5))
                      .BackOffRetries(2)
                      .Size(1000)
                      .RefreshOnCompleted()
                      .Index(indeksName)
                      .BufferToBulk((r, buffer) => r.IndexMany(buffer))
                  , tokenSource.Token);
      
              try
              {
                  observableBulk.Wait(TimeSpan.FromMinutes(15), b =>
                  {
                      bulkAllResponses.Add(b);
                      Interlocked.Increment(ref seenPages);
                  });
              }
              catch (Exception e)
              {
                  Console.WriteLine("Exxx => " + e.Message);
              }
              foreach (var err in errors)
              {
                  Console.WriteLine("Error : " + err);
              }
      

      我希望它可以帮助其他有这个问题的人。

      【讨论】:

        【解决方案3】:

        我不怀疑 Russ Cams 的回答是正确的,但在另一种情况下可能会出现此错误。

        如果您使用类似的东西在 Elastic Search 中实现了 API 安全性

        POST /_security/api_key
        {
        "name":"my-api-key",
        "role_descriptors": {
           "admin": {
               "cluster":["all"],
               "index": [
                  {
                    "names":["my-index", "my-other-index"],
                    "privileges": ["all"]
                  }
                ]
             }
           }
        }
        

        并且您没有为这些索引使用正确的 ApiKey,或者正在尝试使用生成的 ApiKey 创建并将 bulkAll 放入另一个没有为其定义 ApiKey 的索引。

        而不是在删除/创建调用期间收到错误..

        await client.Indices.DeleteAsync(IndexName);
        await client.Indices.CreateAsync(IndexName, MutateCreateIndexDescriptor);
        

        您将收到您在问题中指定的错误。

        BulkAll halted after receiving failures that can not be retried from _bulk
        

        堆栈跟踪错误地指向:

        bulkAll.Wait(... , ...);
        

        如果已定义。

        这可能是 NEST 中的一个错误。在版本中体验过。 7.8.0 希望这对偶然发现它的人有所帮助。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2018-08-10
          • 1970-01-01
          • 1970-01-01
          • 2016-08-30
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-04-22
          相关资源
          最近更新 更多