【问题标题】:Progress measuring with python's multiprocessing Pool and map function使用 python 的多处理池和映射功能测量进度
【发布时间】:2015-11-17 07:31:39
【问题描述】:

以下我用于并行 csv 处理的代码:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp

def init_worker(x):
  sleep(.5)
  print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rt")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    results = p.map(init_worker, csvReader, chunksize = 10)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()
    p.terminate()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # WRITE RESULTS TO OUTPUT FILE
  [csvWriter.writerow(row) for row in results]

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(results)
  # print len(results)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())

if __name__ == '__main__':
  main()

我想以某种方式衡量脚本的进度(只是纯文本而不是任何花哨的 ASCII 艺术)。我想到的一个选择是将init_worker 成功处理的行与 input.csv 中的所有行进行比较,并打印实际状态,例如每一秒,你能指点我正确的解决方案吗?我发现了几篇有类似问题的文章,但我无法使其适应我的需求,因为它们都没有使用 Pool 类和 map 方法。我还想问一下p.close(), p.join(), p.terminate() 方法,我主要在Process 而不是Pool 类中看到它们,Pool 类是否需要它们并且我是否正确使用它们?使用 p.terminate() 的目的是用 ctrl+c 杀死进程,但这是 different 的故事,还没有一个圆满的结局。谢谢。

PS:如果重要的话,我的 input.csv 看起来像这样:

0,0
1,3
2,6
3,9
...
...
48,144
49,147

PPS:正如我所说,我是multiprocessing 的新手,我编写的代码可以正常工作。我可以看到的一个缺点是整个 csv 都存储在内存中,所以如果你们有更好的想法,请不要犹豫分享它。

编辑

回复@J.F.Sebastian

这是根据您的建议我的实际代码:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
from tqdm import tqdm

def do_job(x):
  sleep(.5)
  # print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):

  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rb")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job, csvReader, chunksize = 10)
    for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)):
      csvWriter.writerow(result)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(result)
  # print len(result)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())

if __name__ == '__main__':
  main()

这是tqdm的输出:

1 [elapsed: 00:05,  0.20 iters/sec]

这个输出是什么意思?在您提到的页面上,tqdm 在循环中使用如下方式:

>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(100)):
...     time.sleep(1)
... 
|###-------| 35/100  35% [elapsed: 00:35 left: 01:05,  1.00 iters/sec]

这个输出是有道理的,但我的输出是什么意思?此外,似乎 ctrl+c 问题没有得到解决:在点击 ctrl+c 脚本后会抛出一些 Traceback,如果我再次点击 ctrl+c,那么我会得到新的 Traceback,依此类推。杀死它的唯一方法是将其发送到后台 (ctr+z) 然后将其杀死 (kill %1)

【问题讨论】:

  • 无关:在 Python 2 上对 csv 文件使用 'rb' 模式。
  • 另外,init_worker 名称具有误导性。在您的情况下,init_worker 可能会在同一个工作进程中运行多次。
  • 无关:在p.join()之后再调用p.terminate()是没有意义的。

标签: python multithreading csv parallel-processing python-multiprocessing


【解决方案1】:

要显示进度,请将pool.map 替换为pool.imap_unordered

from tqdm import tqdm # $ pip install tqdm

for result in tqdm(pool.imap_unordered(init_worker, csvReader, chunksize=10)):
    csvWriter.writerow(result)

tqdm 部分是可选的,参见Text Progress Bar in the Console

意外地,它修复了您的“整个 csv 存储在内存中” 和“没有引发键盘中断”的问题。

这是一个完整的代码示例:

#!/usr/bin/env python
import itertools
import logging
import multiprocessing
import time

def compute(i):
    time.sleep(.5)
    return i**2

if __name__ == "__main__":
    logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",
                        datefmt="%F %T", level=logging.DEBUG)
    pool = multiprocessing.Pool()
    try:
        for square in pool.imap_unordered(compute, itertools.count(), chunksize=10):
            logging.debug(square) # report progress by printing the result
    except KeyboardInterrupt:
        logging.warning("got Ctrl+C")
    finally:
        pool.terminate()
        pool.join()

您应该每隔.5 * chunksize 秒看到一次批量输出。如果按Ctrl+C;您应该会在子进程和主进程中看到KeyboardInterrupt。在 Python 3 中,主进程立即退出。在 Python 2 中,KeyboardInterrupt 会延迟到应该打印下一批(Python 中的错误)。

【讨论】:

  • 不幸的是,这似乎没有帮助,请检查已编辑的 OP。非常感谢你。
  • @WakanTanka: (1) 如果您不理解tqdm 的输出,那么只需将其放下,然后打印每次迭代的进度报告,但您喜欢。 (2) 在异常处理程序中使用p.terminate()。放在之前 join()
  • 嗨@J.F.塞巴斯蒂安抱歉稍后回复。我了解 tqdm 的输出,但问题是它是按进程报告的,而不是针对整个任务报告的。 PS:我在except KeyboardInterrupt: 分支中将p.terminate() 放在p.close()p.join() 之间,按下ctrl+c 后我仍然得到相同的行为。您能否发布整个有效的代码,以便我接受答案。谢谢。
猜你喜欢
  • 2017-04-09
  • 1970-01-01
  • 1970-01-01
  • 2015-06-08
  • 2011-08-05
  • 1970-01-01
  • 1970-01-01
  • 2020-03-12
  • 1970-01-01
相关资源
最近更新 更多