【发布时间】:2018-08-17 03:09:55
【问题描述】:
我正在尝试使用 Elasticsearch parallel_bulk 导入大量数据。 这是我的索引结构:
{
"_index" : "myindex",
"_type" : domain,
"_id" : md5(email),
"_score" : 1.0,
"_source" : {
"purchase_date" : purchase_date,
"amount" : amount,
}
}
这是我的python代码:
def insert(input_file):
paramL = []
with open(input_file) as f:
for line in f:
line = line.rstrip()
fields = line.split(',')
purchase_date = fields[0]
amount = fields[1]
email = fields[2]
id_email = getMD5(email)
doc = {
"email": email,
"purchase_date": purchase_date,
"amount": amount _date
}
ogg = {
'_op_type': 'index',
'_index': index_param,
'_type': doctype_param,
'_id': id_email,
'_source': doc
}
paramL.append(ogg)
if len(paramL) > 500000:
for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
if not success:
print "Insert failed: ", info
# empty paramL if size > 5.000.000
del paramL[:]
该文件包含 42.644.394 行,我认为每次列表“paramL”大约有 5.000.000 个元素时插入数据。 因此,当我运行脚本时,它会插入大约 436.226 个值,直到它崩溃并出现以下错误:
Traceback(最近一次调用最后一次):文件“test-2-0.py”,行 133,在 main() 文件“test-2-0.py”,第 131 行,在 main 插入(args.file)文件“test-2-0.py”,第 82 行,插入 为了成功,helpers.parallel_bulk(client=es, actions=paramL, thread_count=4) 中的信息:文件 "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/init.py", 第 306 行,并行批量 _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer) 文件 “/usr/lib/python2.7/multiprocessing/pool.py”,第 668 行,在下一个 提高值 elasticsearch.exceptions.ConnectionTimeout:ConnectionTimeout 由 - ReadTimeoutError(HTTPConnectionPool(主机=u'127.0.0.1',端口=9200): 读取超时。 (读取超时=10))
我还尝试在 Elasticsearch 构造函数中增加超时时间
es = Elasticsearch(['127.0.0.1'], request_timeout=30)
但结果是一样的。
【问题讨论】: