【问题标题】:Elasticsearch.Net 7 & Nest 7 - Invalid bulk update requestElasticsearch.Net 7 & Nest 7 - 无效的批量更新请求
【发布时间】:2020-07-18 09:31:04
【问题描述】:

我正在尝试创建一个 .Net Core (netcoreapp3.1) 应用程序来将我的数据从 Azure Eventhub 发送到 ES 7。

我正在使用以下软件包:
ElasticSearch.Net 7.8.1
嵌套 7.8.1

我从 Eventhub 检索的数据是从 IElasticBaseEntity 继承的 2 种类型。
我想批量更新这些对象的列表,可以是包含所有信息的对象,也可以是更新已索引人员的一个字段的对象。
ES中匹配/搜索的字段是id字段。

为了简化我的示例,我将使用这些虚拟类:

public interface IElasticBaseEntity
{
   string Id { get; set; }
   DateTime ServerTimestamp { get; set; }
}

person 类是包含所有信息的类

public abstract class Person: IElasticBaseEntity
{
   public string Id { get; set; }
   public string Firstname {get; set;}
   public string Name {get; set;}   
   public decimal? Score { get; set; }
}

Score 类是我要根据 Id 对索引的 Person 进行的更新

public abstract class Score : IElasticBaseEntity
{
   public string Id {get; set;}
   public decimal? Score { get; set; }
}

我使用这种方法来建立与 ES 的连接

public static IElasticClient CreateInstance(ElasticsearchConfig config)
{
   IConnectionPool pool;    
   var connection = new HttpConnection();

   pool = new SniffingConnectionPool(new[] { new Uri(config.Url) }) { SniffedOnStartup = true };

   var connectionSettings = new ConnectionSettings(pool, connection);
   connectionSettings.BasicAuthentication(config.Username, config.Password);
   connectionSettings.ServerCertificateValidationCallback(ServerCertificateValidationCallback);
   connectionSettings.RequestTimeout(TimeSpan.FromMinutes(2)).EnableHttpCompression();
   var client = new ElasticClient(connectionSettings);
    
   return client;
}

所以我开始使用 ElasticClient 上的批量命令。 过去我可以使用descriptor.Index 添加对象,但当然,我想要更新而不是插入/创建所有内容。

所以我想出了这个,但由于某种原因,我一直在 Visual Studio 2019 中收到关于“无效 /_bulk 请求”的错误,而没有任何其他信息。

IEnumerable<IElasticBaseEntity> list = RetrievedData();

var descriptor = new BulkDescriptor();
            
foreach (var eachDoc in list)
{
   var doc = eachDoc;
   descriptor.Update<IElasticBaseEntity>(i => i
      .Id(doc.Id)
      .Doc(doc)
      .DocAsUpsert(true));   
}

var response = await _client.BulkAsync(descriptor);

// Try to debug & see what was send
if (response.ApiCall.RequestBodyInBytes != null)
{
   var jsonOutput = System.Text.Encoding.UTF8.GetString(response.ApiCall.RequestBodyInBytes);
}

我收到的错误如下(从response.DebugInformation 检索):

在 POST 上的 () 低级调用不成功构建的 NEST 响应无效:/persons-20200717/_bulk

无效的批量项目:

此 API 调用的审计跟踪:

  • [1] PingFailure:节点:https://myconnectiontoES:9243/ 异常:PipelineException Take:00:00:01.2859155
  • [2] SniffOnFail:接受:00:00:02.1577638
  • [3] SniffFailure:节点:https://myconnectiontoES:9243/ 异常:PipelineException Take:00:00:02.0840985

OriginalException: Elasticsearch.Net.ElasticsearchClientException: 嗅探集群状态失败.. >调用:未知资源

---> Elasticsearch.Net.PipelineException:嗅探集群状态失败。 ---> Elasticsearch.Net.PipelineException:尝试从>指定节点读取响应时发生错误。 在 Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancelToken) --- 内部异常堆栈跟踪结束 --- 在 Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancelToken) 在 Elasticsearch.Net.RequestPipeline.SniffOnConnectionFailureAsync(CancellationToken >cancellationToken) 在 Elasticsearch.Net.Transport1.PingAsync(IRequestPipeline pipeline, Node node, CancellationToken &gt;cancellationToken) at Elasticsearch.Net.Transport1.RequestAsync[TResponse](HttpMethod 方法, 字符串路径, >CancellationToken cancelToken, PostData data, IRequestParameters requestParameters) --- 内部异常堆栈跟踪结束 ---

第 1 步 PingFailure 中的审核异常:

Elasticsearch.Net.PipelineException:尝试从指定>节点读取响应时发生错误。 在 Elasticsearch.Net.RequestPipeline.PingAsync(Node 节点,CancellationToken cancelToken)

第 3 步 SniffFailure 中的审核异常:

Elasticsearch.Net.PipelineException:尝试从指定>节点读取响应时发生错误。 在 Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancelToken)

请求:

on ConnectionSettings 以强制在响应上设置它。>

回应:

所以我的/_bulk 请求似乎有问题

我尝试过的:

  • 使用 fiddler 捕获 /_bulk 请求 > 请求无效且未发送
  • 尝试设置 connectionSettings.DisableDirectStreaming(true);来打印请求>这总是null

因此,如果有人能指出我在构建/_bulk 请求时所犯的错误,或者可以指出我调试此问题并检索更多信息的方向,我将不胜感激。 目前我正在兜圈子,重新阅读文档,谷歌搜索,但没有任何结果。

谢谢

【问题讨论】:

  • 要查看调用 elasticsearch 时发生了什么,请检查 response.DebugInformation 属性。您可以在docs 中阅读有关使用 NEST 调试 ES 调用的更多信息。如果您发现任何有趣的事情,请更新您的问题:)

标签: c# elasticsearch nest


【解决方案1】:

尝试读取对sniffing the cluster 的响应时发生故障。似乎发生了嗅探,因为 ping 集群失败,因此值得检查您是否可以使用客户端配置使用的凭据向 Elasticsearch 集群的基地址发出 HEAD 请求。

根据连接 Elasticsearch 时使用的端口 9243,我怀疑 Elasticsearch 集群在 Elastic Cloud 中运行。 SniffingConnectionPool 不得与在 Elastic Cloud 中运行的 Elasticsearch 集群一起使用,因为嗅探响应中返回的用于到达 Elasticsearch 节点的地址是客户端无法访问的内部地址。相反,应该使用CloudConnectionPool,它在创建ElasticClient 的实例时有一些方便的方法可以使用

var client = new ElasticClient(
    "<cloud id retrieved from the Elastic Cloud web console>", 
    new BasicAuthenticationCredentials(config.Username, config.Password));

CloudConnectionPool默认会使用http压缩,但是如果你需要更多的控制其他配置比如请求超时,你可以使用

var pool = new CloudConnectionPool(
    "<cloud id retrieved from the Elastic Cloud web console>", 
    new BasicAuthenticationCredentials(config.Username, config.Password));
    
var settings = new ConnectionSettings(pool)
    .RequestTimeout(TimeSpan.FromMinutes(2));

var client = new ElasticClient(settings);

【讨论】:

    【解决方案2】:

    转了一圈之后,我开始创建测试来检查基本的 ES 命令。 根据我为 ElasticSearch.Net 找到的示例,我无法成功运行任何示例。

    所以我通过 curl 命令尝试检查了 ES 索引。
    curl -H "Content-Type: application/json" -u username:password -XPOST "https://elasticsearch_url:port/indexname/stats/1" -d "{\"application\": \"test\"}"

    此命令失败,并显示我不允许联系 ES 的消息。 因此,我们检查了 ES 用户,显然分配给该用户的角色存在错误(在 ElasticSearch 中)。 一旦我们解决了这个问题,我就可以运行我的代码并执行部分更新。

    IEnumerable<IElasticBaseEntity> list = RetrievedData();
    
    var descriptor = new BulkDescriptor();
            
    foreach (var eachDoc in list)
    {
       var doc = eachDoc;
       descriptor.Update<IElasticBaseEntity>(i => i
          .Id(doc.Id)
          .Doc(doc)
          .DocAsUpsert(true));   
    }
    
    var response = await _client.BulkAsync(descriptor);
    

    我现在遇到的问题是,它运行了大约 1 小时,然后我得到了 SnifFailure。 重试与 ES 的连接,但大多以“MaxTimeOutReached”结束

    RequestTimeOut 设置为 2 分钟 未设置重试和超时,因此使用默认值(-> 这是重试,直到达到 2 分钟的 RequestTimeout)

    正如您在下面的日志中看到的:我能够将文档发送到 ES,然后在某些时候我收到此错误。主要是在我的应用程序运行 1 小时之后。

    2020-08-05 11:22:01.5553| INFO| 001 ES_indexName processed documents 08/05/2020 11:22:01 1 in 202.3803 ms
    2020-08-05 11:29:28.9633| INFO| 001 ES_indexName processed documents 08/05/2020 11:29:28 1 in 179.7982 ms
    2020-08-05 11:40:08.4666| INFO| 001 ES_indexName processed documents 08/05/2020 11:40:07 1 in 291.5695 ms
    2020-08-05 11:47:26.2924|ERROR| failed Invalid NEST response built from a unsuccessful () low level call on POST: /ES_indexName/_bulk
    # Invalid Bulk items:
    # Audit trail of this API call:
     - [1] PingFailure: Node: https://ES_node1:port/ Exception: PipelineException Took: 00:01:40.0193513
     - [2] SniffOnFail: Took: 00:05:00.0132241
     - [3] SniffFailure: Node: https://ES_node2:port/ Exception: PipelineException Took: 00:01:40.0036955
     - [4] SniffFailure: Node: https://ES_node3:port/ Exception: PipelineException Took: 00:01:40.0019296
     - [5] SniffFailure: Node: https://ES_node1:port/ Exception: PipelineException Took: 00:01:40.0005639
     - [6] MaxTimeoutReached:
    # OriginalException: Elasticsearch.Net.ElasticsearchClientException: Maximum timeout reached while retrying request. Call: unknown resource
     ---> Elasticsearch.Net.PipelineException: Failed sniffing cluster state.
     ---> System.AggregateException: One or more errors occurred. (An error occurred trying to write the request data to the specified node.) (An error occurred trying to write the request data to the specified node.) (An error occurred trying to write the request data to the specified node.)
     ---> Elasticsearch.Net.PipelineException: An error occurred trying to write the request data to the specified node.
     ---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
       at System.Net.Http.ConnectHelper.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean allowHttp2, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.CreateHttp11ConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.GetHttpConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
       at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpClient.FinishSendAsyncUnbuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
       at Elasticsearch.Net.HttpConnection.RequestAsync[TResponse](RequestData requestData, CancellationToken cancellationToken)
       --- End of inner exception stack trace ---
       at Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancellationToken)
       --- End of inner exception stack trace ---
     ---> (Inner Exception #1) Elasticsearch.Net.PipelineException: An error occurred trying to write the request data to the specified node.
     ---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
       at System.Net.Http.ConnectHelper.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean allowHttp2, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.CreateHttp11ConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.GetHttpConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
       at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpClient.FinishSendAsyncUnbuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
       at Elasticsearch.Net.HttpConnection.RequestAsync[TResponse](RequestData requestData, CancellationToken cancellationToken)
       --- End of inner exception stack trace ---
       at Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancellationToken)<---
    
     ---> (Inner Exception #2) Elasticsearch.Net.PipelineException: An error occurred trying to write the request data to the specified node.
     ---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
       at System.Net.Http.ConnectHelper.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean allowHttp2, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.CreateHttp11ConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.GetHttpConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
       at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpClient.FinishSendAsyncUnbuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
       at Elasticsearch.Net.HttpConnection.RequestAsync[TResponse](RequestData requestData, CancellationToken cancellationToken)
       --- End of inner exception stack trace ---
       at Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancellationToken)<---
    
       --- End of inner exception stack trace ---
       at Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancellationToken)
       at Elasticsearch.Net.RequestPipeline.SniffOnConnectionFailureAsync(CancellationToken cancellationToken)
       at Elasticsearch.Net.Transport`1.PingAsync(IRequestPipeline pipeline, Node node, CancellationToken cancellationToken)
       at Elasticsearch.Net.Transport`1.RequestAsync[TResponse](HttpMethod method, String path, CancellationToken cancellationToken, PostData data, IRequestParameters requestParameters)
       --- End of inner exception stack trace ---
    # Audit exception in step 1 PingFailure:
    Elasticsearch.Net.PipelineException: An error occurred trying to write the request data to the specified node.
     ---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
       at System.Net.Http.ConnectHelper.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean allowHttp2, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.CreateHttp11ConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.GetHttpConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
       at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpClient.FinishSendAsyncUnbuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
       at Elasticsearch.Net.HttpConnection.RequestAsync[TResponse](RequestData requestData, CancellationToken cancellationToken)
       --- End of inner exception stack trace ---
       at Elasticsearch.Net.RequestPipeline.PingAsync(Node node, CancellationToken cancellationToken)
    # Audit exception in step 3 SniffFailure:
    Elasticsearch.Net.PipelineException: An error occurred trying to write the request data to the specified node.
     ---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
       at System.Net.Http.ConnectHelper.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean allowHttp2, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.CreateHttp11ConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.GetHttpConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
       at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpClient.FinishSendAsyncUnbuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
       at Elasticsearch.Net.HttpConnection.RequestAsync[TResponse](RequestData requestData, CancellationToken cancellationToken)
       --- End of inner exception stack trace ---
       at Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancellationToken)
    # Audit exception in step 4 SniffFailure:
    Elasticsearch.Net.PipelineException: An error occurred trying to write the request data to the specified node.
     ---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
       at System.Net.Http.ConnectHelper.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean allowHttp2, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.CreateHttp11ConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.GetHttpConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
       at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpClient.FinishSendAsyncUnbuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
       at Elasticsearch.Net.HttpConnection.RequestAsync[TResponse](RequestData requestData, CancellationToken cancellationToken)
       --- End of inner exception stack trace ---
       at Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancellationToken)
    # Audit exception in step 5 SniffFailure:
    Elasticsearch.Net.PipelineException: An error occurred trying to write the request data to the specified node.
     ---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
       at System.Net.Http.ConnectHelper.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean allowHttp2, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.CreateHttp11ConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.GetHttpConnectionAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
       at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
       at System.Net.Http.HttpClient.FinishSendAsyncUnbuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
       at Elasticsearch.Net.HttpConnection.RequestAsync[TResponse](RequestData requestData, CancellationToken cancellationToken)
       --- End of inner exception stack trace ---
       at Elasticsearch.Net.RequestPipeline.SniffAsync(CancellationToken cancellationToken)
    # Request:
    <Request stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>
    # Response:
    <Response stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>
     400039.9975 ms MUST RETRY```
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-11
      • 1970-01-01
      • 1970-01-01
      • 2014-09-17
      • 1970-01-01
      • 1970-01-01
      • 2015-03-11
      • 2022-08-10
      相关资源
      最近更新 更多