【问题标题】:Processing huge CSV file using Python and multithreading使用 Python 和多线程处理巨大的 CSV 文件
【发布时间】:2017-07-06 13:51:16
【问题描述】:

我有一个函数可以懒惰地从一个巨大的 CSV 文件中生成行:

def get_next_line():
    with open(sample_csv,'r') as f:
        for line in f:
            yield line

def do_long_operation(row):
    print('Do some operation that takes a long time')

我需要使用线程,这样我从上述函数中获得的每条记录都可以调用do_long_operation

互联网上的大多数地方都有这样的例子,我不太确定我是否走在正确的道路上。

import threading
thread_list = []
for i in range(8):
   t = threading.Thread(target=do_long_operation, args=(get_next_row from get_next_line))
   thread_list.append(t)

for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

我的问题是:

  1. 如何只启动有限数量的线程,比如 8 个?

  2. 如何确保每个线程都能从get_next_line 获得一行?

【问题讨论】:

    标签: multithreading python-3.x


    【解决方案1】:

    您可以使用来自multiprocessing 的线程池并将您的任务映射到工作人员池:

    from multiprocessing.pool import ThreadPool as Pool
    # from multiprocessing import Pool
    from random import randint
    from time import sleep
    
    
    def process_line(l):
        print l, "started"
        sleep(randint(0, 3))
        print l, "done"
    
    
    def get_next_line():
        with open("sample.csv", 'r') as f:
            for line in f:
                yield line
    
    f = get_next_line()
    
    t = Pool(processes=8)
    
    for i in f:
        t.map(process_line, (i,))
    t.close()
    t.join()
    

    这将创建八名工人,并将您的线路一一提交给他们。一旦一个进程“空闲”,它就会被分配一个新任务。

    还有一个注释掉的导入语句。如果您注释掉 ThreadPool 并改为从 multiprocessing 导入 Pool,您将获得子进程而不是线程,这在您的情况下可能更有效。

    【讨论】:

    • 我会改用来自多处理的池。试一试,看看哪个工作得更快,但我很确定这将是多处理解决方案,但如果你需要共享很多内部变量或类似的东西,无论如何这都是糟糕的编程,可能会阻止你使用多处理池。两个池的函数接口完全相同。
    【解决方案2】:

    使用来自多处理的 Pool/ThreadPool 将任务映射到工作池和队列来控制内存中保存的任务数量(因此,如果工作进程很慢,我们就不会提前读入巨大的 CSV 文件):

    from multiprocessing.pool import ThreadPool as Pool
    # from multiprocessing import Pool
    from random import randint
    import time, os
    from multiprocessing import Queue
    
    
    def process_line(l):
        print("{} started".format(l))
        time.sleep(randint(0, 3))
        print("{} done".format(l))
    
    
    def get_next_line():
        with open(sample_csv, 'r') as f:
            for line in f:
                yield line
    
    # use for testing
    # def get_next_line():
    #     for i in range(100):
    #         print('yielding {}'.format(i))
    #         yield i
    
    
    def worker_main(queue):
        print("{} working".format(os.getpid()))
        while True:
            # Get item from queue, block until one is available
            item = queue.get(True)
            if item == None:
                # Shutdown this worker and requeue the item so other workers can shutdown as well
                queue.put(None)
                break
            else:
                # Process item
                process_line(item)
        print("{} done working".format(os.getpid()))
    
    
    f = get_next_line()
    
    # Use a multiprocessing queue with maxsize
    q = Queue(maxsize=5)
    
    # Start workers to process queue items
    t = Pool(processes=8, initializer=worker_main, initargs=(q,))
    
    # Enqueue items. This blocks if the queue is full.
    for l in f:
        q.put(l)
    
    # Enqueue the shutdown message (i.e. None)
    q.put(None)
    
    # We need to first close the pool before joining
    t.close()
    t.join()
    

    【讨论】:

    • 在这个例子中,你将如何返回一个处理过的线元素数组?例如,假设您要返回每行中的第一个元素,您将如何将它们迭代地附加到一个空列表中,然后返回最终的值列表?
    • @user8188120 您可以使用第二个队列(输出队列),工作线程会将已处理的元素写入该队列。创建一个队列并使用initargs 将其传递给worker_main,然后在worker_main 中将处理后的元素附加到队列中。您可以有另一个池从输出队列中读取已处理的元素或在最后读取它们(在t.join() 之后)。
    【解决方案3】:

    Hannu's answer 不是最好的方法。 我在一个 100M 行的 CSV 文件上运行了代码。我花了很长时间才完成手术。

    但是,在阅读他的答案之前,我已经编写了以下代码:

    def call_processing_rows_pickably(row):
        process_row(row)
    
    import csv
    from multiprocessing import Pool
    import time
    import datetime
    
    def process_row(row):
        row_to_be_printed = str(row)+str("hola!")
        print(row_to_be_printed)
    
    class process_csv():
    
        def __init__(self, file_name):
            self.file_name = file_name
    
        def get_row_count(self):
            with open(self.file_name) as f:
                for i, l in enumerate(f):
                    pass
            self.row_count = i
    
        def select_chunk_size(self):
            if(self.row_count>10000000):
                self.chunk_size = 100000
                return
            if(self.row_count>5000000):
                self.chunk_size = 50000
                return
            self.chunk_size = 10000
            return
    
        def process_rows(self):
            list_de_rows = []
            count = 0
            with open(self.file_name, 'rb') as file:
                reader = csv.reader(file)
                for row in reader:
                    print(count+1)
                    list_de_rows.append(row)
                    if(len(list_de_rows) == self.chunk_size):
                        p.map(call_processing_rows_pickably, list_de_rows)
                        del list_de_rows[:]
    
        def start_process(self):
            self.get_row_count()
            self.select_chunk_size()
            self.process_rows()
    
    initial = datetime.datetime.now()
    p = Pool(4)
    ob = process_csv("100M_primes.csv")
    ob.start_process()
    final = datetime.datetime.now()
    print(final-initial)
    

    这需要 22 分钟。显然,我需要有更多的改进。例如,R 中的 Fred 库最多需要 10 分钟来完成这项任务。

    不同之处在于:我先创建一个 100k 行的块,然后将其传递给由 threadpool(here, 4 threads) 映射的函数。

    【讨论】:

    • 我认为这在 python3 中不起作用,因为 csv.reader 只能从以 'r' 模式打开的文件中读取
    猜你喜欢
    • 2010-11-13
    • 1970-01-01
    • 2017-02-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-01
    • 2017-09-14
    • 1970-01-01
    相关资源
    最近更新 更多