【问题标题】:Scroll example in ElasticSearch NEST APIElasticSearch NEST API 中的滚动示例
【发布时间】:2015-07-09 20:42:08
【问题描述】:

我正在使用 .From() 和 .Size() 方法从 Elastic Search 结果中检索所有文档。

以下是示例 -

ISearchResponse<dynamic> bResponse = ObjElasticClient.Search<dynamic>(s => s.From(0).Size(25000).Index("accounts").AllTypes().Query(Query));

最近我遇到了 Elastic Search 的滚动功能。这看起来比专门用于获取大数据的 From() 和 Size() 方法更好。

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

我正在寻找有关 NEST API 中滚动功能的示例。

有人可以提供 NEST 示例吗?

谢谢, 萨米尔

【问题讨论】:

标签: elasticsearch nest


【解决方案1】:

这是一个在 NEST 和 C# 中使用滚动的示例。适用于 5.x 和 6.x

public IEnumerable<T> GetAllDocumentsInIndex<T>(string indexName, string scrollTimeout = "2m", int scrollSize = 1000) where T : class
      {
          ISearchResponse<T> initialResponse = this.ElasticClient.Search<T>
              (scr => scr.Index(indexName)
                   .From(0)
                   .Take(scrollSize)
                   .MatchAll()
                   .Scroll(scrollTimeout));

          List<T> results = new List<T>();

          if (!initialResponse.IsValid || string.IsNullOrEmpty(initialResponse.ScrollId))
              throw new Exception(initialResponse.ServerError.Error.Reason);

          if (initialResponse.Documents.Any())
              results.AddRange(initialResponse.Documents);

          string scrollid = initialResponse.ScrollId;
          bool isScrollSetHasData = true;
          while (isScrollSetHasData)
          {
              ISearchResponse<T> loopingResponse = this.ElasticClient.Scroll<T>(scrollTimeout, scrollid);
              if (loopingResponse.IsValid)
              {
                  results.AddRange(loopingResponse.Documents);
                  scrollid = loopingResponse.ScrollId;
              }
              isScrollSetHasData = loopingResponse.Documents.Any();
          }

          this.ElasticClient.ClearScroll(new ClearScrollRequest(scrollid));
          return results;
      }

来自:http://telegraphrepaircompany.com/elasticsearch-nest-scroll-api-c/

【讨论】:

    【解决方案2】:

    NEST Reindex 的内部实现使用滚动将文档从一个索引移动到另一个索引。

    这应该是一个很好的起点。

    您可以在下面找到来自github 的有趣代码。

    var page = 0;
    var searchResult = this.CurrentClient.Search<T>(
        s => s
            .Index(fromIndex)
            .AllTypes()
            .From(0)
            .Size(size)
            .Query(this._reindexDescriptor._QuerySelector ?? (q=>q.MatchAll()))
            .SearchType(SearchType.Scan)
            .Scroll(scroll)
        );
    if (searchResult.Total <= 0)
        throw new ReindexException(searchResult.ConnectionStatus, "index " + fromIndex + " has no documents!");
    IBulkResponse indexResult = null;
    do
    {
        var result = searchResult;
        searchResult = this.CurrentClient.Scroll<T>(s => s
            .Scroll(scroll)
            .ScrollId(result.ScrollId)
        );
        if (searchResult.Documents.HasAny())
            indexResult = this.IndexSearchResults(searchResult, observer, toIndex, page);
        page++;
    } while (searchResult.IsValid && indexResult != null && indexResult.IsValid && searchResult.Documents.HasAny());
    

    您也可以查看integration test for Scroll

    [Test]
    public void SearchTypeScan()
    {
        var scanResults = this.Client.Search<ElasticsearchProject>(s => s
            .From(0)
            .Size(1)
            .MatchAll()
            .Fields(f => f.Name)
            .SearchType(SearchType.Scan)
            .Scroll("2s")
        );
        Assert.True(scanResults.IsValid);
        Assert.False(scanResults.FieldSelections.Any());
        Assert.IsNotNullOrEmpty(scanResults.ScrollId);
    
        var results = this.Client.Scroll<ElasticsearchProject>(s=>s
            .Scroll("4s") 
            .ScrollId(scanResults.ScrollId)
        );
        var hitCount = results.Hits.Count();
        while (results.FieldSelections.Any())
        {
            Assert.True(results.IsValid);
            Assert.True(results.FieldSelections.Any());
            Assert.IsNotNullOrEmpty(results.ScrollId);
            var localResults = results;
            results = this.Client.Scroll<ElasticsearchProject>(s=>s
                .Scroll("4s")
                .ScrollId(localResults.ScrollId));
            hitCount += results.Hits.Count();
        }
        Assert.AreEqual(scanResults.Total, hitCount);
    }
    

    【讨论】:

    • 谢谢。不支持聚合查询查询类型“扫描”。那么在没有搜索类型“扫描”的情况下使用 Scroll 仍然很好吗?
    • 是的,但是滚动效率会降低elastic.co/guide/en/elasticsearch/guide/current/…。取决于您的用例。
    【解决方案3】:

    我冒昧地将 Michael 的好答案改写为异步且不那么冗长(v. 6.x Nest):

    public async Task<IList<T>> RockAndScroll<T>(
        string indexName,
        string scrollTimeoutMinutes = "2m",
        int scrollPageSize = 1000
    ) where T : class
    {
        var searchResponse = await this.ElasticClient.SearchAsync<T>(sd => sd
            .Index(indexName)
            .From(0)
            .Take(scrollPageSize)
            .MatchAll()
            .Scroll(scrollTimeoutMinutes));
    
        var results = new List<T>();
    
        while (true)
        {
            if (!searchResponse.IsValid || string.IsNullOrEmpty(searchResponse.ScrollId))
                throw new Exception($"Search error: {searchResponse.ServerError.Error.Reason}");
    
            if (!searchResponse.Documents.Any())
                break;
    
            results.AddRange(searchResponse.Documents);
            searchResponse = await ElasticClient.ScrollAsync<T>(scrollTimeoutMinutes, searchResponse.ScrollId);
        }
    
        await this.ElasticClient.ClearScrollAsync(new ClearScrollRequest(searchResponse.ScrollId));
    
        return results;
    }
    

    【讨论】:

    • 几个 cmets:在scrollTimeoutMilliseconds = "2m" 中,2m 表示分钟,而不是毫秒;见here。这个参数的确切作用的解释是here。您可能还想返回 Task&lt;ICollection&lt;T&gt;&gt; 而不是 Task&lt;IEnumerable&lt;T&gt;&gt;,因为结果已经在列表中。
    • 啊,感谢@TobiasJ 的更正。我已经重命名了变量。至于ICollection,它可能返回一个IList&lt;T&gt; 更具描述性,它实际上是一个可以通过索引访问的ICollection。也更正了
    猜你喜欢
    • 2014-11-12
    • 2022-01-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-26
    • 1970-01-01
    • 1970-01-01
    • 2018-06-08
    相关资源
    最近更新 更多