【发布时间】:2021-12-29 13:10:57
【问题描述】:
在其文档字符串中,elasticsearch.helpers.async_bulk 将自己描述为一个
:meth:
~elasticsearch.AsyncElasticsearch.bulkapi 的助手,提供 一个更人性化的界面——它使用一个动作迭代器和 将它们分块发送到elasticsearch。 source
上下文
我已经成功使用AsyncElasticsearch.bulk() 将pandas 数据帧发送到某个ES 实例
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
问题
但是,当涉及到 async_bulk 时,我收到了 index is missing 错误。
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
尝试以多种方式调整_rec_to_actions(),但效果不佳。
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
我想主要问题是我不太确定什么是动作,在弹性搜索的上下文中。这个概念在文档中无处不在,但在这个库source code 中没有明确的数据结构对应物(反正我找不到)
action 究竟是什么?我应该如何调整生成器以将 df 的数据发送到 self.index?
环境
- python = "3.9.5"
- elasticsearch = "7.14.1"
【问题讨论】:
标签: python elasticsearch