【发布时间】:2021-05-24 09:58:11
【问题描述】:
给出要处理的数据列表和 64 核 CPU(外加 500 GB RAM)。 该列表应该对字符串进行排序并将数据存储在数百万条记录的结果集中,运行得很好,多处理需要几秒钟。 但我还需要以某种方式将结果存储在 txt、csv 输出或数据库中。到目前为止,我还没有找到一个可行的解决方案,因为在第一部分(过程)之后,插入方法要么在尝试使用 MySQL 池时出错,要么需要很长时间才能给出 txt 输出。 到目前为止我尝试过的:简单的 txt 输出,打印到 txt 文件,使用 csv、pandas 和 numpy 库。似乎没有什么可以加快速度。任何帮助将不胜感激! 我现在的代码:
import os
import re
import datetime
import time
import csv
import mysql.connector as connector
from mysql.connector.pooling import MySQLConnectionPool
import mysql
import numpy as np
from tqdm import tqdm
from time import sleep
import multiprocessing as mp
import numpy
pool = MySQLConnectionPool( pool_name="sql_pool",
pool_size=32,
pool_reset_session=True,
host="localhost",
port="3306",
user="homestead",
password="secret",
database="homestead")
# # sql connection
db = mysql.connector.connect(
host="localhost",
port="3306",
user="homestead",
password="secret",
database="homestead"
)
sql_cursor = db.cursor()
delete_statement = "DELETE FROM statistics"
sql_cursor.execute(delete_statement)
db.commit()
sql_statement = "INSERT INTO statistics (name, cnt) VALUES (%s, %s)"
list = []
domains = mp.Manager().list()
unique_list = mp.Manager().list()
invalid_emails = mp.Manager().list()
result = mp.Manager().list()
regex_email = '^(\w|\.|\_|\-)+[@](\w|\_|\-|\.)+[.]\w{2,3}$'
# check email validity
def check(list, email):
if(re.search(regex_email, email)):
domains.append(email.lower().split('@')[1])
return True
else:
invalid_emails.append(email)
return False
#end of check email validity
# execution time converter
def convertTime(seconds):
seconds = seconds % (24 * 3600)
hour = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
if(hour == 0):
if(minutes == 0):
return "{0} sec".format(seconds)
else:
return "{0}min {1}sec".format(minutes, seconds)
else:
return "{0}hr {1}min {2}sec".format(hour, minutes, seconds)
# execution time converter end
#process
def process(list):
for item in tqdm(list):
if(check(list, item)):
item = item.lower().split('@')[1]
if item not in unique_list:
unique_list.append(item)
# end of process
def insert(list):
global sql_statement
# Add to db
con = pool.get_connection()
cur = con.cursor()
print("PID %d: using connection %s" % (os.getpid(), con))
#cur.executemany(sql_statement, sorted(map(set_result, list)))
for item in list:
cur.execute(sql_statement, (item, domains.count(item)))
con.commit()
cur.close()
con.close()
# def insert_into_database(list):
#sql_cursor.execute(sql_statement, (unique_list, 1), multi=True)
# sql_cursor.executemany(sql_statement, sorted(map(set_result, list)))
# db.commit()
# statistics
def statistics(list):
for item in tqdm(list):
if(domains.count(item) > 0):
result.append([domains.count(item), item])
# end of statistics
params = sys.argv
filename = ''
process_count = -1
for i, item in enumerate(params):
if(item.endswith('.txt')):
filename = item
if(item == '--top'):
process_count = int(params[i+1])
def set_result(item):
return item, domains.count(item)
# main
if(filename):
try:
start_time = time.time()
now = datetime.datetime.now()
dirname = "email_stats_{0}".format(now.strftime("%Y%m%d_%H%M%S"))
os.mkdir(dirname)
list = open(filename).read().split()
if(process_count == -1):
process_count = len(list)
if(process_count > 0):
list = list[:process_count]
#chunking list
n = int(len(list) / mp.cpu_count())
chunks = [list[i:i + n] for i in range(0, len(list), n)]
processes = []
print('Processing list on {0} cores...'.format(mp.cpu_count()))
for chunk in chunks:
p = mp.Process(target=process, args=[chunk])
p.start()
processes.append(p)
for p in processes:
p.join()
# insert(unique_list)
## step 2 - write sql
## Clearing out db before new data insert
con = pool.get_connection()
cur = con.cursor()
delete_statement = "DELETE FROM statistics"
cur.execute(delete_statement)
u_processes = []
#Maximum pool size for sql is 32, so maximum chunk number should be that too.
if(mp.cpu_count() < 32):
n2 = int(len(unique_list) / mp.cpu_count())
else:
n2 = int(len(unique_list) / 32)
u_chunks = [unique_list[i:i + n2] for i in range(0, len(unique_list), n2)]
for u_chunk in u_chunks:
p = mp.Process(target=insert, args=[u_chunk])
p.start()
u_processes.append(p)
for p in u_processes:
p.join()
for p in u_processes:
p.close()
# sql_cursor.executemany(sql_statement, sorted(map(set_result, unique_list)))
# db.commit()
# for item in tqdm(unique_list):
# sql_val = (item, domains.count(item))
# sql_cursor.execute(sql_statement, sql_val)
#
# db.commit()
## numpy.savetxt('saved.txt', sorted(map(set_result, unique_list)), fmt='%s')
# with(mp.Pool(mp.cpu_count(), initializer = db) as Pool:
# Pool.map_async(insert_into_database(),set(unique_list))
# Pool.close()
# Pool.join()
print('Creating statistics for {0} individual domains...'.format(len(unique_list)))
# unique_list = set(unique_list)
# with open("{0}/result.txt".format(dirname), "w+") as f:
# csv.writer(f).writerows(sorted(map(set_result, unique_list), reverse=True))
print('Writing final statistics...')
print('OK.')
f = open("{0}/stat.txt".format(dirname),"w+")
f.write("Number of processed emails: {0}\r\n".format(process_count))
f.write("Number of valid emails: {0}\r\n".format(len(list) - len(invalid_emails)))
f.write("Number of invalid emails: {0}\r\n".format(len(invalid_emails)))
f.write("Execution time: {0}".format(convertTime(int(time.time() - start_time))))
f.close()
except FileNotFoundError:
print('File not found, path or file broken.')
else:
print('Wrong file format, should be a txt file.')
# main
【问题讨论】:
-
通常认为使用重新定义(也使不可用)内置 Python 类的变量名是不明智的做法,例如
list = []. -
您正在使用托管
list进行 'unique_list` 测试if item not in unique_list。这是O(N)操作。可以模拟set的托管dict不是更适合这个吗?
标签: python parallel-processing multiprocessing mysql-python cpu-usage