【问题标题】:Periodically process and update documents in elasticsearch index定期处理和更新 elasticsearch 索引中的文档
【发布时间】:2022-01-12 18:21:37
【问题描述】:

我需要制定一个策略来定期高效地处理和更新弹性搜索索引中的文档。我不必查看我之前处理过的文件。

我的设置是我有一个长时间运行的进程,它不断地将文档插入索引,比如说大约。每小时 500 个文档(想想常见的日志记录示例)。

我需要找到一种解决方案来定期更新一些文档(例如通过 cron 作业)以在特定字段(例如文本字段)上运行一些代码,以使用许多新字段来增强该文档。我想这样做是为了在索引上提供更细粒度的聚合。在日志类比中,这可能是,例如,我从日志条目(文档)中获取 UserAgent 字符串,对其进行一些解析,然后将一些新字段添加回该文档并为其编制索引。

所以我的方法是:

  1. 获取一些我以前没有看过的文档(甚至全部)。例如,我可以通过组合 must_notexists 来查询它们。
  2. 在这些文档上运行我的代码(运行解析器,计算一些新的东西,等等)。
  3. 更新之前获得的文档(可能最好通过批量 api)。

我知道有Update by query API。但这似乎不在这里,因为我需要在我的服务器上运行我自己的代码(顺便说一句,这取决于外部库),而不是作为一个简单的脚本,它不能提供我需要的全面任务。

我正在通过python 访问elasticsearch。

现在的问题是我不知道如何实现上述方法。例如。如果第一步获取的文档量大于myindex.settings.index.max_result_window怎么办?

有什么想法吗?

【问题讨论】:

标签: python elasticsearch cron insert-update elasticsearch-py


【解决方案1】:

我考虑了@Jay 的评论并最终得出了这种模式:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import scan

from my_module.postprocessing import post_process_doc

es = Elasticsearch(...)
es.ping()

def update_docs( docs ):
    """"""
    for idx,doc in enumerate(docs):
        if idx % 10000 == 0:
            print( 'next 10k' )
        
        new_field_value = post_process_doc( doc )

        doc_update = {
            "_index": doc["_index"],
            "_id" : doc["_id"],
            "_op_type" : "update",
            "doc" : { <<the new field>> : new_field_value }
        }

        yield doc_update

docs = scan( es, query='{ "query" : { "bool": { "must_not": { "exists": { "field": <<the new field>> }} } }}', index=index, scroll="1m", preserve_order=True )

bulk( es, update_docs( docs ) )

评论:

【讨论】:

    猜你喜欢
    • 2017-12-25
    • 2017-02-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-03
    • 1970-01-01
    • 2020-04-18
    相关资源
    最近更新 更多