【发布时间】:2019-11-27 09:40:26
【问题描述】:
我有一个大列表,它本身由 53,000,000 个较小的列表作为元素组成。我想将这些较小的列表中的每一个作为一行以批量大小为 1,000,000 的方式分批提交给数据库,这意味着每次脚本连接到数据库时,它都会提交 1000,000 个元素,然后它会断开与数据库的连接,它再次连接以提交另外 1,000,000 行。
现在我的问题是,如果中间发生错误,例如在提交 50,000,000 行后,我需要删除数据库中的所有行并尝试从头开始提交所有内容。
我在想也许我可以使用 rollback() 来删除现在已经添加的所有 50,000,000 行,但是只要我使用循环,我不知道如何回滚所有提交的 50,000,000 行分批。
有人有建议吗?
这是我的脚本: “结果”是包含 53,000,000 个较小列表作为元素的列表。
batch = []
counter = 0
BATCH_SIZE =1000000
cursor_count = 0
def prepare_names(names):
return [w.replace("'", '') for w in names]
for i in range(len(results)):
if counter < BATCH_SIZE:
batch.append(prepare_names([results[i][0], results[i][1], results[i][2]])) # batch => [[ACC1234.0, 'Some full taxa name'], ...]
counter += 1
else:
batch.append(prepare_names([results[i][0], results[i][1], results[i][2]]))
values = (", ").join([f"('{d[0]}', '{d[1]}', '{d[2]}')" for d in batch])
sql = f"INSERT INTO accession_taxonomy(accession_number, taxon_id, taxonomy) VALUES {values}"
try:
cursor.execute(sql)
db.commit()
except Exception as exception:
print(exception)
print(f"Problem with query: {sql}")
print(cursor.rowcount, "Records Inserted")
cursor_count += cursor.rowcount
counter = 0
batch = []
else:
if batch:
values = (", ").join([f"('{d[0]}', '{d[1]}', '{d[2]}')" for d in batch])
sql = f"INSERT INTO accession_taxonomy(accession_number, taxon_id, taxonomy) VALUES {values}"
try:
cursor.execute(sql)
db.commit()
except Exception as exception:
print(exception)
print(f"Problem with query: {sql}")
print(cursor.rowcount, "Records Inserted")
cursor_count += cursor.rowcount
print("Total Number Of %s Rows Has Been Added." %(cursor_count))
db.close()
【问题讨论】:
-
如果我理解你的脚本,你的 SQL 查询会一次性插入 100 万行,对吧?
-
是的,在每个查询中插入 1,000,000 行。
-
为什么不单独提交每个批次,而是保留一个跟踪哪些数据/批次已经提交的文件?这样,在出现任何错误的情况下(回滚错误批处理后),您可以再次运行代码并继续写入未提交的数据?
标签: python mysql transactions rollback