【问题标题】:ElasticSearch NEST: Bulk-indexing operation does not make use of specified document IDsElasticSearch NEST:批量索引操作不使用指定的文档 ID
【发布时间】:2019-09-26 18:13:29
【问题描述】:

我目前使用ElasticSearch NEST 7.x 库。

在托管我的 ElasticSearch 主节点的 VM 上,我正在运行一个通过 REST 接收 JSON 数据的 Web 服务器。然后将这些 JSON 数据保存在 ElasticSearch 中。

首先将接收到的JSON数据传入该方法进行解析:

private static (bool Success, string ErrorMessage) TryReadRawJsonData(
    string rawJsonData, out IEnumerable<(string Index, ExpandoObject JsonContent)> jsonLines)
{
    var results = new List<(string Index, ExpandoObject JsonContent)>();

    foreach (string rawDataLine in HttpContext.Current.Server.UrlDecode(rawJsonData).Split('\n').Where(line => !string.IsNullOrWhiteSpace(line)))
    {
        dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(rawDataLine);

        if (!Dynamic.HasProperty(expandoObject, "IndexId"))
        {
            jsonLines = Enumerable.Empty<(string, ExpandoObject)>();
            return (Success: false, ErrorMessage: $"No field named 'IndexId' found in {rawDataLine}.");
        }

        string indexId = (string)expandoObject.IndexId.ToLower();
        results.Add((indexId, JsonContent: expandoObject));
    }

    jsonLines = results;
    return (Success: true, ErrorMessage: null);
}

如果解析成功,随后将返回值传递给此方法进行批量索引:

private static async Task<HttpResponseMessage> BulkIndexAsync(IEnumerable<(string Index, ExpandoObject JsonContent)> contents)
{
    foreach (var group in contents.GroupBy(line => line.Index))
    {
        BulkResponse bulkIndexResponse = 
            await ElasticClient.BulkAsync(bulk => bulk.Index(group.Key).IndexMany(group.Select(member => member.JsonContent)));

        if (bulkIndexResponse.Errors)
        {
            return new HttpResponseMessage(HttpStatusCode.BadRequest)
            {
                Content = new StringContent(bulkIndexResponse.ItemsWithErrors
                                                             .Select(itemWithError =>
                                                                 $"Index: {itemWithError.Index}; " +
                                                                 $"Document Id: {itemWithError.Id}; " +
                                                                 $"Error: {itemWithError.Error.Reason}.")
                                                             .ConcatenateIntoString(separator: "\n"))
            };
        }
    }
    return new HttpResponseMessage(HttpStatusCode.OK);
}

批量索引操作成功,但遗憾的是文档 ID 与我预期的不符。这是一个例子:

{
    "_index": "dummyindex",
    "_type": "_doc",
    "_id": "U1W4Z20BcmiMRnw-blTi",
    "_score": 1.0,
    "_source": {
        "IndexId": "dummyindex",
        "Id": "0c2d48bd-6842-4f15-b7f2-57fa259b0642",
        "UserId": "dummy_user_1",
        "Country": "dummy_stan"
    }
}

如您所见,Id 字段为0c2d48bd-6842-4f15-b7f2-57fa259b0642,根据documentation,应自动推断为文档 ID。但是,_id 字段设置为 U1W4Z20BcmiMRnw-blTi 而不是 0c2d48bd-6842-4f15-b7f2-57fa259b0642

我做错了什么?

【问题讨论】:

  • 你用的是哪个版本的elasticsearch?你能分享样品批量请求吗?

标签: c# elasticsearch nest


【解决方案1】:

答案来自here

ExpandoObject 上的Id 不是该类型的属性,而是ExpandoObject 支持的底层IDictionary&lt;string,object&gt; 中的一个键。

您可以通过使用 ExpandoObject 反射属性来看到这一点

dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
        ""IndexId"": ""dummyindex"",
        ""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
        ""UserId"": ""dummy_user_1"",
        ""Country"": ""dummy_stan""
    }
");

Type t = expandoObject.GetType();
PropertyInfo[] properties = t.GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance);
foreach (PropertyInfo property in properties)
{
    Console.WriteLine(property.ToString());
}

打印出来的

System.Dynamic.ExpandoClass Class
System.Collections.Generic.ICollection`1[System.String] System.Collections.Generic.IDictionary<System.String,System.Object>.Keys
System.Collections.Generic.ICollection`1[System.Object] System.Collections.Generic.IDictionary<System.String,System.Object>.Values
System.Object System.Collections.Generic.IDictionary<System.String,System.Object>.Item [System.String]
Int32 System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.Count
Boolean System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.IsReadOnly

要解决您的问题,您可以为每个文档指定 Id,但是通过将第二个委托参数传递给 .IndexMany()

dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
        ""IndexId"": ""dummyindex"",
        ""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
        ""UserId"": ""dummy_user_1"",
        ""Country"": ""dummy_stan""
    }
");

var bulkResponse = client.Bulk(bu => bu
    .IndexMany(new[] { expandoObject }, (b, d) => b.Id((Id)d.Id))
);

d.IdId 的转换(或者可能是字符串,因为那是实际类型,但转换为 Id 将使用从字符串到 Id 的隐式转换)是必需的,因为 d是动态类型,如果没有它,运行时将无法调度。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-07-19
    • 1970-01-01
    • 1970-01-01
    • 2020-09-11
    • 2013-03-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多