【问题标题】:elasticsearch-py: error using async_bulk, index missing from actionselasticsearch-py:使用 async_bulk 时出错,操作中缺少索引
【发布时间】:2021-12-29 13:10:57
【问题描述】:

在其文档字符串中,elasticsearch.helpers.async_bulk 将自己描述为一个

:meth:~elasticsearch.AsyncElasticsearch.bulk api 的助手,提供 一个更人性化的界面——它使用一个动作迭代器和 将它们分块发送到elasticsearch。 source

上下文

我已经成功使用AsyncElasticsearch.bulk() 将pandas 数据帧发送到某个ES 实例

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
        yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
        yield (json.dumps(record, default=int))

async def send_to_elasticsearch(self, df: DataFrame):
    logger.info(f"{self.stage_name} sending batch to elastic")
    await self.elastic_client.bulk(self._rec_to_actions(df))

问题

但是,当涉及到 async_bulk 时,我收到了 index is missing 错误。

async def send_to_elasticsearch(self, df: DataFrame):
    await async_bulk(self.elastic_client, self._rec_to_actions(df))

尝试以多种方式调整_rec_to_actions(),但效果不佳。

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
        record["index"] = self.index
        yield (json.dumps(record, default=int))

我想主要问题是我不太确定什么是动作,在弹性搜索的上下文中。这个概念在文档中无处不在,但在这个库source code 中没有明确的数据结构对应物(反正我找不到)

action 究竟是什么?我应该如何调整生成器以将 df 的数据发送到 self.index

环境

  • python = "3.9.5"
  • elasticsearch = "7.14.1"

【问题讨论】:

    标签: python elasticsearch


    【解决方案1】:

    This documentation 更容易:

    def _rec_to_actions(self, df):
        for record in df.to_dict(orient="records"):
           yield {"_index": self.index, "_source": json.dumps(record, default=int)}
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-29
      • 1970-01-01
      • 2018-08-29
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多