【发布时间】:2018-09-15 08:44:49
【问题描述】:
我正在尝试使用 elasticsearch python 包使用批量 API 索引文档。我正在从具有大约 10000 条记录的 mySQL DB 中获取数据。但是,我的 Python 批量 api 脚本只能上传 5000 条记录,并且在中间的某个地方它会中断。
我收到此错误 UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)
def new_products(catid):
connection = get_connection()
es = get_elastic_connection()
cursor = connection.cursor()
catid = int(catid)
sql = "SELECT * FROM %s WHERE catid=%d AND product_id<>0 LIMIT %d" % (TABLENAME, catid, LIMIT_PER_THREAD_ON_NEW)
cursor.execute(sql)
product_ids_result = cursor.fetchall()
product_ids_only = map(lambda x: x['product_id'], product_ids_result)
product_ids_indexes = {}
for row in product_ids_result:
product_ids_indexes[row['product_id']] = row['id']
products_list = []
if product_ids_only:
sql = "SELECT * FROM tbl_products WHERE catid=%d AND product_id IN (%s)" % (catid, ','.join(map(str, product_ids_only)))
cursor.execute(sql)
products_list = cursor.fetchall()
while products_list:
print catid, len(products_list)
product_ids_from_db = map(lambda x: x['pid'], products_list)
product_images = get_images(product_ids_from_db)
product_specs = get_specs(catid, product_ids_from_db)
bulk_data = []
for row in products_list:
row['p_spec'] = {'d_spec': [], 'f_spec': []}
if row['pid'] in product_specs:
if product_specs[row['pid']].has_key('d_spec'):
row['p_spec']['d_spec'] = product_specs[row['pid']]['d_spec']
if product_specs[row['pid']].has_key('f_spec'):
row['p_spec']['f_spec'] = product_specs[row['pid']]['f_spec']
if row['pid'] in product_images:
if product_images[row['pid']]:
row['pimg'] = product_images[row['pid']]
row['no_img'] = '1'
bulk_data.append({
"index": {
'_index': ES_INDEX,
'_type': ES_TYPE,
'_id': row['pid']
}
})
bulk_data.append(row)
if len(bulk_data) == ES_LIMIT_PER_REQUEST:
responses = es.bulk(index=ES_INDEX, body=bulk_data, refresh=True)
bulk_data = []
if len(bulk_data) > 0:
responses = es.bulk(index=ES_INDEX, body=bulk_data, refresh=True)
sql = "SELECT * FROM %s WHERE catid=%d AND product_id<>0 LIMIT %d" % (TABLENAME, catid, LIMIT_PER_THREAD_ON_NEW)
cursor.execute(sql)
new_product_ids_result = cursor.fetchall()
new_product_ids_only = map(lambda x: x['product_id'], new_product_ids_result)
if set(product_ids_only) == set(new_product_ids_only):
print catid, "new products are same"
break;
else:
product_ids_only = new_product_ids_only
if new_product_ids_only:
sql = "SELECT * FROM tbl_products WHERE catid=%d AND product_id IN (%s)" % (catid, ','.join(map(str, new_product_ids_only)))
cursor.execute(sql)
products_list = cursor.fetchall()
else:
products_list = []
connection.close()
任何线索这里出了什么问题。
问候
【问题讨论】:
-
你能把你的 /var/log/elasticsearch/elasticsearch.log 的最后几行贴出来吗?
-
只有服务器启动信息可用。
-
ES_LIMIT_PER_REQUEST 的值是多少?
-
1K 是请求的限制
-
最后我遇到了一些编码问题。 UnicodeDecodeError:“ascii”编解码器无法解码位置 3 中的字节 0xc3:序数不在范围内(128)
标签: python python-2.7 elasticsearch indexing bulk