【问题标题】:Python elasticsearch bulk API is not working as expectedPython elasticsearch 批量 API 未按预期工作
【发布时间】:2018-09-15 08:44:49
【问题描述】:

我正在尝试使用 elasticsearch python 包使用批量 API 索引文档。我正在从具有大约 10000 条记录的 mySQL DB 中获取数据。但是,我的 Python 批量 api 脚本只能上传 5000 条记录,并且在中间的某个地方它会中断。

我收到此错误 UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)

def new_products(catid):
    connection = get_connection()
    es = get_elastic_connection()
    cursor = connection.cursor()
    catid = int(catid)
    sql = "SELECT  * FROM %s WHERE catid=%d AND product_id<>0 LIMIT %d" % (TABLENAME, catid, LIMIT_PER_THREAD_ON_NEW)

    cursor.execute(sql)
    product_ids_result = cursor.fetchall()
    product_ids_only = map(lambda x: x['product_id'], product_ids_result)
    product_ids_indexes = {}
    for row in product_ids_result:
        product_ids_indexes[row['product_id']] = row['id']

    products_list = []
    if product_ids_only:
        sql = "SELECT * FROM tbl_products WHERE catid=%d AND product_id IN (%s)" % (catid, ','.join(map(str, product_ids_only)))

        cursor.execute(sql)
        products_list = cursor.fetchall()

    while products_list:
        print catid, len(products_list)
        product_ids_from_db = map(lambda x: x['pid'], products_list)
        product_images = get_images(product_ids_from_db)
        product_specs = get_specs(catid, product_ids_from_db)

        bulk_data = []
        for row in products_list:
            row['p_spec'] = {'d_spec': [], 'f_spec': []}
            if row['pid'] in product_specs:
                if product_specs[row['pid']].has_key('d_spec'):
                    row['p_spec']['d_spec'] = product_specs[row['pid']]['d_spec']
                if product_specs[row['pid']].has_key('f_spec'):
                    row['p_spec']['f_spec'] = product_specs[row['pid']]['f_spec']

            if row['pid'] in product_images:
                if product_images[row['pid']]:
                    row['pimg'] = product_images[row['pid']]
                    row['no_img'] = '1'

            bulk_data.append({
                "index": {
                    '_index': ES_INDEX,
                    '_type': ES_TYPE,
                    '_id': row['pid']
                }
            })
            bulk_data.append(row)

            if len(bulk_data) == ES_LIMIT_PER_REQUEST:
                responses = es.bulk(index=ES_INDEX, body=bulk_data, refresh=True)
                bulk_data = []

        if len(bulk_data) > 0:
            responses = es.bulk(index=ES_INDEX, body=bulk_data, refresh=True)


        sql = "SELECT  * FROM %s WHERE catid=%d AND product_id<>0 LIMIT %d" % (TABLENAME, catid, LIMIT_PER_THREAD_ON_NEW)
        cursor.execute(sql)
        new_product_ids_result = cursor.fetchall()
        new_product_ids_only = map(lambda x: x['product_id'], new_product_ids_result)

        if set(product_ids_only) == set(new_product_ids_only):
            print catid, "new products are same"
            break;
        else:
            product_ids_only = new_product_ids_only

        if new_product_ids_only:
            sql = "SELECT * FROM tbl_products WHERE catid=%d AND product_id IN (%s)" % (catid, ','.join(map(str, new_product_ids_only)))

            cursor.execute(sql)
            products_list = cursor.fetchall()
        else:
            products_list = []

    connection.close()

任何线索这里出了什么问题。

问候

【问题讨论】:

  • 你能把你的 /var/log/elasticsearch/elasticsearch.log 的最后几行贴出来吗?
  • 只有服务器启动信息可用。
  • ES_LIMIT_PER_REQUEST 的值是多少?
  • 1K 是请求的限制
  • 最后我遇到了一些编码问题。 UnicodeDecodeError:“ascii”编解码器无法解码位置 3 中的字节 0xc3:序数不在范围内(128)

标签: python python-2.7 elasticsearch indexing bulk


【解决方案1】:

我遇到了问题。

实际上,我正在尝试使用多线程来索引数据。因此,我在运行时没有收到任何错误。

最后,我通过在 mysqldb.connect 函数中传递 charset 和 use_unicode 作为参数来修复。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-09-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-24
    • 2019-10-24
    相关资源
    最近更新 更多