【问题标题】:Python multiprocessing output resultPython多处理输出结果
【发布时间】:2021-05-24 09:58:11
【问题描述】:

给出要处理的数据列表和 64 核 CPU(外加 500 GB RAM)。 该列表应该对字符串进行排序并将数据存储在数百万条记录的结果集中,运行得很好,多处理需要几秒钟。 但我还需要以某种方式将结果存储在 txt、csv 输出或数据库中。到目前为止,我还没有找到一个可行的解决方案,因为在第一部分(过程)之后,插入方法要么在尝试使用 MySQL 池时出错,要么需要很长时间才能给出 txt 输出。 到目前为止我尝试过的:简单的 txt 输出,打印到 txt 文件,使用 csv、pandas 和 numpy 库。似乎没有什么可以加快速度。任何帮助将不胜感激! 我现在的代码:

import os
import re
import datetime
import time
import csv

import mysql.connector as connector
from mysql.connector.pooling import MySQLConnectionPool

import mysql

import numpy as np
from tqdm import tqdm
from time import sleep
import multiprocessing as mp

import numpy

pool = MySQLConnectionPool( pool_name="sql_pool",
                            pool_size=32,
                            pool_reset_session=True,
                            host="localhost",
                            port="3306",
                            user="homestead",
                            password="secret",
                            database="homestead")

# # sql connection
db = mysql.connector.connect(
  host="localhost",
  port="3306",
  user="homestead",
  password="secret",
  database="homestead"
)

sql_cursor = db.cursor()
delete_statement = "DELETE FROM statistics"
sql_cursor.execute(delete_statement)

db.commit()

sql_statement = "INSERT INTO statistics (name, cnt) VALUES (%s, %s)"

list = []
domains = mp.Manager().list()
unique_list = mp.Manager().list()
invalid_emails = mp.Manager().list()
result = mp.Manager().list()
regex_email = '^(\w|\.|\_|\-)+[@](\w|\_|\-|\.)+[.]\w{2,3}$'

# check email validity
def check(list, email):
    if(re.search(regex_email, email)):
        domains.append(email.lower().split('@')[1])
        return True
    else:
        invalid_emails.append(email)
        return False
#end of check email validity

# execution time converter
def convertTime(seconds):
    seconds = seconds % (24 * 3600)
    hour = seconds // 3600
    seconds %= 3600
    minutes = seconds // 60
    seconds %= 60

    if(hour == 0):
        if(minutes == 0):
            return "{0} sec".format(seconds)
        else:
            return "{0}min {1}sec".format(minutes, seconds)
    else:
        return "{0}hr {1}min {2}sec".format(hour, minutes, seconds)
# execution time converter end

#process
def process(list):
    for item in tqdm(list):
        if(check(list, item)):
            item = item.lower().split('@')[1]
            if item not in unique_list:
                unique_list.append(item)
# end of process

def insert(list):
    global sql_statement

    # Add to db
    con = pool.get_connection()
    cur = con.cursor()

    print("PID %d: using connection %s" % (os.getpid(), con))
    #cur.executemany(sql_statement, sorted(map(set_result, list)))
    for item in list:

        cur.execute(sql_statement, (item, domains.count(item)))
    con.commit()
    cur.close()
    con.close()

# def insert_into_database(list):
    #sql_cursor.execute(sql_statement, (unique_list, 1), multi=True)

    # sql_cursor.executemany(sql_statement, sorted(map(set_result, list)))
    # db.commit()

# statistics
def statistics(list):
    for item in tqdm(list):
        if(domains.count(item) > 0):
            result.append([domains.count(item), item])
# end of statistics

params = sys.argv
filename = ''
process_count = -1
for i, item in enumerate(params):
    if(item.endswith('.txt')):
        filename = item
    if(item == '--top'):
        process_count = int(params[i+1])


def set_result(item):
    return item, domains.count(item)

# main
if(filename):
    try:
        start_time = time.time()
        now = datetime.datetime.now()
        dirname = "email_stats_{0}".format(now.strftime("%Y%m%d_%H%M%S"))
        os.mkdir(dirname)

        list = open(filename).read().split()

        if(process_count == -1):
            process_count = len(list)

        if(process_count > 0):
            list = list[:process_count]

        #chunking list
        n = int(len(list) /  mp.cpu_count())
        chunks = [list[i:i + n] for i in range(0, len(list), n)]

        processes = []
        print('Processing list on {0} cores...'.format(mp.cpu_count()))
        for chunk in chunks:
            p = mp.Process(target=process, args=[chunk])
            p.start()
            processes.append(p)

        for p in processes:
            p.join()

        # insert(unique_list)

        ## step 2 - write sql

        ##  Clearing out db before new data insert
        con = pool.get_connection()
        cur = con.cursor()

        delete_statement = "DELETE FROM statistics"
        cur.execute(delete_statement)

        u_processes = []

        #Maximum pool size for sql is 32, so maximum chunk number should be that too.
        if(mp.cpu_count() < 32):
            n2 = int(len(unique_list) /  mp.cpu_count())
        else:
            n2 = int(len(unique_list) /  32)

        u_chunks = [unique_list[i:i + n2] for i in range(0, len(unique_list), n2)]
        for u_chunk in u_chunks:
            p = mp.Process(target=insert, args=[u_chunk])
            p.start()
            u_processes.append(p)

        for p in u_processes:
            p.join()

        for p in u_processes:
            p.close()

        # sql_cursor.executemany(sql_statement, sorted(map(set_result, unique_list)))
        # db.commit()
        # for item in tqdm(unique_list):
        #     sql_val = (item, domains.count(item))
        #     sql_cursor.execute(sql_statement, sql_val)
        #
        #     db.commit()

        ## numpy.savetxt('saved.txt', sorted(map(set_result, unique_list)), fmt='%s')


        # with(mp.Pool(mp.cpu_count(), initializer = db) as Pool:
        #     Pool.map_async(insert_into_database(),set(unique_list))
        #     Pool.close()
        #     Pool.join()

        print('Creating statistics for {0} individual domains...'.format(len(unique_list)))

        # unique_list = set(unique_list)
        # with open("{0}/result.txt".format(dirname), "w+") as f:
        #     csv.writer(f).writerows(sorted(map(set_result, unique_list), reverse=True))

        print('Writing final statistics...')
        print('OK.')
        f = open("{0}/stat.txt".format(dirname),"w+")
        f.write("Number of processed emails: {0}\r\n".format(process_count))
        f.write("Number of valid emails: {0}\r\n".format(len(list) - len(invalid_emails)))
        f.write("Number of invalid emails: {0}\r\n".format(len(invalid_emails)))
        f.write("Execution time: {0}".format(convertTime(int(time.time() - start_time))))
        f.close()

    except FileNotFoundError:
        print('File not found, path or file broken.')
else:
    print('Wrong file format, should be a txt file.')
# main

【问题讨论】:

  • 通常认为使用重新定义(也使不可用)内置 Python 类的变量名是不明智的做法,例如list = [].
  • 您正在使用托管 list 进行 'unique_list` 测试 if item not in unique_list。这是O(N) 操作。可以模拟set 的托管dict 不是更适合这个吗?

标签: python parallel-processing multiprocessing mysql-python cpu-usage


【解决方案1】:

请参阅我的 cmets,了解您可能希望进行的一些更改,其中一项可能会提高性能。但我认为真正可以改进的性能领域之一是您使用托管列表。这些由代理表示,并且对此类列表的每个操作本质上都是远程过程调用,因此非常慢。您无法避免这种情况,因为您需要让多个进程更新一个通用的共享列表(如果您接受我的建议,则为 dict)。但在主进程中,您可能会尝试,例如,从共享列表构造一个集合,如下所示:

Pool.map_async(insert_into_database(),set(unique_list))

(顺便说一下,应该是Pool.map(insert_into_database, set(unique_list)),也就是说,你有一组额外的(),然后你可以摆脱对pool.close()pool.join()的调用,如果你愿意的话)

问题是您正在通过代理迭代unique_list 的每个元素,这可能需要很长时间。我说“可能”是因为我认为使用托管列表会阻止代码按原样(即不输出结果)在“几秒钟”内完成,如果我们谈论的是“数百万”条记录和数百万远程过程调用。但是,如果您能以某种方式将底层 list 作为本机列表,那么这个数字肯定会减少。

首先,您需要注意我关于声明了一个名为list 的变量的评论,因此无法创建list 的本机列表或子类。一旦您将该变量重命名为更合理的值,我们就可以创建自己的托管类MyList,它将公开构建它的底层list。请注意,您可以对子类dictMyDict 类执行相同的操作。我已经为你定义了这两个类。这是一个基准,显示了从托管列表构建原生列表与从MyList 创建原生列表之间的区别:

import multiprocessing as mp
from multiprocessing.managers import BaseManager
import time

class MyManager(BaseManager):
    pass

class MyList(list):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def get_underlying_list(self):
        return self

class MyDict(dict):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def get_underlying_dict(self):
        return self

# required for windows, which I am running on:
if __name__ == '__main__':
    l = mp.Manager().list()
    for i in range(100_000):
        l.append(i)
    t = time.time()
    l2 = list(l)
    print(time.time() - t, l2[0:5], l2[-5:])


    MyManager.register('MyList', MyList)
    MyManager.register('MyDict', MyDict)
    my_manager = MyManager()
    # must explicitly start the manager or use: with MyManager() as manager:
    my_manager.start()
    l = my_manager.MyList()
    for i in range(100_000):
        l.append(i)
    t = time.time()
    l2 = list(l.get_underlying_list())
    print(time.time() - t, l2[0:5], l2[-5:])

打印:

7.3949973583221436 [0, 1, 2, 3, 4] [99995, 99996, 99997, 99998, 99999]
0.007997751235961914 [0, 1, 2, 3, 4] [99995, 99996, 99997, 99998, 99999]

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-11
    • 1970-01-01
    • 2019-01-16
    • 2021-05-13
    • 2013-03-30
    • 1970-01-01
    相关资源
    最近更新 更多