【问题标题】:Connection timeout on Elasticsearch parallel_bulkElasticsearch parallel_bulk 上的连接超时
【发布时间】:2018-08-17 03:09:55
【问题描述】:

我正在尝试使用 Elasticsearch parallel_bulk 导入大量数据。 这是我的索引结构:

{
    "_index" : "myindex",
    "_type" : domain,
    "_id" : md5(email),
    "_score" : 1.0,
    "_source" : {
      "purchase_date" : purchase_date,
      "amount" : amount,
    }
}

这是我的python代码:

def insert(input_file):
    paramL = []

    with open(input_file) as f:
        for line in f:
            line = line.rstrip()

            fields = line.split(',')
            purchase_date = fields[0]
            amount = fields[1]
            email = fields[2]               

            id_email = getMD5(email)

            doc = {
                "email": email,
                "purchase_date": purchase_date,
                "amount": amount _date
            }

            ogg = {
                '_op_type': 'index',
                '_index': index_param,
                '_type': doctype_param,
                '_id': id_email,
                '_source': doc
            }

            paramL.append(ogg)    

            if len(paramL) > 500000:
                for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
                    if not success:
                        print "Insert failed: ", info

                # empty paramL if size > 5.000.000
                del paramL[:]

该文件包含 42.644.394 行,我认为每次列表“paramL”大约有 5.000.000 个元素时插入数据。 因此,当我运行脚本时,它会插入大约 436.226 个值,直到它崩溃并出现以下错误:

Traceback(最近一次调用最后一次):文件“test-2-0.py”,行 133,在 main() 文件“test-2-0.py”,第 131 行,在 main 插入(args.file)文件“test-2-0.py”,第 82 行,插入 为了成功,helpers.parallel_bulk(client=es, actions=paramL, thread_count=4) 中的信息:文件 "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/init.py", 第 306 行,并行批量 _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer) 文件 “/usr/lib/python2.7/multiprocessing/pool.py”,第 668 行,在下一个 提高值 elasticsearch.exceptions.ConnectionTimeout:ConnectionTimeout 由 - ReadTimeoutError(HTTPConnectionPool(主机=u'127.0.0.1',端口=9200): 读取超时。 (读取超时=10))

我还尝试在 Elasticsearch 构造函数中增加超时时间

es = Elasticsearch(['127.0.0.1'], request_timeout=30)

但结果是一样的。

【问题讨论】:

    标签: python-2.7 elasticsearch


    【解决方案1】:

    真诚地,我从来没有使用这么多文档进行批量导入。我不知道为什么会出现这个错误。在您的情况下,我建议不要创建列表 -paramL - 而是使用生成器函数管理您的数据 - 正如弹性开发人员在弹性论坛中描述的大批量摄取的最佳实践:https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/3。像这样的:

    def insert(input_file):
    
        with open(input_file) as f:
            for line in f:
                line = line.rstrip()
    
                fields = line.split(',')
                purchase_date = fields[0]
                amount = fields[1]
                email = fields[2]               
    
                id_email = getMD5(email)
    
                doc = {
                    "email": email,
                    "purchase_attack": purchase_date,
                    "amount _relevation": amount _date
                }
    
                yield {
                    '_op_type': 'index',
                    '_index': index_param,
                    '_type': doctype_param,
                    '_id': id_email,
                    '_source': doc
                }
    
    
    
    for success, info in helpers.parallel_bulk(client=es, actions=insert(input_file), thread_count=4):
        if not success:
            print "Insert failed: ", info
    

    在java虚拟机编辑这个文件/etc/elasticsearch/jvm.options可以增加elastic专用空间 要分配 2 GB 的 RAM,您应该更改 - 如果您的机器有 4 GB,您应该为系统保留近 1 GB,这样您最多可以分配 3 GB:

    # Xms represents the initial size of total heap space
    # Xmx represents the maximum size of total heap space
    
     -Xms2g
     -Xmx2g
    

    那你要重启服务

    sudo service elasticsearch restart
    

    然后再试一次。祝你好运

    【讨论】:

    • 嗨@luponaoide,我定制了你的建议(在我之前的问题中)。我不得不以这种方式放置“parallel_bulk”函数,否则当 paramL 超过收集值的大约 7.000.000 时脚本会崩溃。事实上,最初我使用的逻辑与您的类似,但正如解释的那样,它崩溃了。
    • hi @bit 您是否尝试过使用生成器,就像我的回答一样,不使用 paramL?
    • Ciao @bit - 我也是意大利人 - di Roma,所以我很理解你的日志 - connessione interrotta dal corrispondente ,英文版是由对等方重置连接。这是一个非常烦人的异常——不是错误——因为大多数时候你可以忽略这个异常。可能和很多情况有关,最好去Elastic开发者监控的Elastic官方论坛寻求帮助。但是,该系统似乎确实处于压力之下。如果你可以为 elasticseach 分配更多的 ram - 如果我会告诉你怎么做,我会尝试这种方式
    • 我也来自罗马...ti devo un caffè allora! XD 我会尝试提高性能。这是当前配置: - Intel Xeon 3199MHz - RAM 4GB - swap 4GB
    • @bit 我已经使用为 ES 增加 RAM 的说明升级了我的答案。 Per il caffé volentieri, eddaje! XD la cosa migliore sarebbe consigliarmi alla tua società qualora gli servisse un altro sviluppatore python - 弹性。 È davvero un 赌场 cambiar lavoro qui!
    猜你喜欢
    • 2015-04-01
    • 2021-04-13
    • 2021-06-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-14
    • 1970-01-01
    • 2011-08-01
    • 2019-06-20
    相关资源
    最近更新 更多