【问题标题】:Python: 'before' and 'after' for multiprocessing workersPython:多处理工作者的“之前”和“之后”
【发布时间】:2025-12-17 10:10:01
【问题描述】:

更新:这里有一个更具体的例子

假设我想从一组相当大的文件中编译一些统计数据: 我可以制作一个生成器(line for line in fileinput.input(files)) 和一些处理器:

from collections import defaultdict 
scores = defaultdict(int) 

def process(line): 
    if 'Result' in line: 
        res = line.split('\"')[1].split('-')[0]
        scores[res] += 1

问题是当一个人到达multiprocessing.Pool 时如何处理这个问题。

当然可以定义multiprocessing.sharedctypes 和自定义struct 而不是defaultdict,但这似乎相当痛苦。另一方面,我想不出在进程之前实例化某些东西或在生成器用完主线程后返回某些东西的pythonic方法。

【问题讨论】:

  • 也许其他人明白你的问题是什么......但我想我不明白。你能进一步澄清吗?
  • 你了解 else 子句在 for 循环中的作用吗?
  • 您要解决什么问题?您想知道每个项目何时完成吗?当他们全部完成?还有什么?
  • 第一段代码在做什么?通常elsefor 子句之后表示一个结构,表示“如果循环没有遇到break 语句,则执行此操作”,但只有process(i) 我不确定你期望else 做什么在这里做。
  • 让我把问题说得更具体一些,对不起各位

标签: python multiprocessing generator


【解决方案1】:

所以你基本上创建了一个直方图。这可以很容易地并行化,因为直方图可以合并而不复杂。有人可能想说这个问题很容易并行化或"embarrassingly parallel"。也就是说,您无需担心工作人员之间的通信。

只需将您的数据集分成多个块,让您的工作人员独立处理这些块,收集每个工作人员的直方图,然后合并直方图。

在实践中,最好让每个工作人员处理/读取自己的文件来解决这个问题。也就是说,“任务”可以是文件名。您不应该开始腌制文件内容并通过管道在进程之间发送它们。让每个工作进程直接从文件中检索批量数据。否则你的架构会花费太多时间在进程间通信上,而不是做一些真正的工作。

你需要一个例子还是你自己能解决这个问题?

编辑:示例实现

我有许多数据文件的文件名采用这种格式:data0.txtdata1.txt、...。

示例内容:

wolf
wolf
cat
blume
eisenbahn

目标是在数据文件中包含的单词上创建直方图。这是代码:

from multiprocessing import Pool
from collections import Counter
import glob


def build_histogram(filepath):
    """This function is run by a worker process.
    The `filepath` argument is communicated to the worker
    through a pipe. The return value of this function is
    communicated to the manager through a pipe.
    """
    hist = Counter()
    with open(filepath) as f:
        for line in f:
            hist[line.strip()] += 1
    return hist


def main():
    """This function runs in the manager (main) process."""

    # Collect paths to data files.
    datafile_paths = glob.glob("data*.txt")

    # Create a pool of worker processes and distribute work.
    # The input to worker processes (function argument) as well
    # as the output by worker processes is transmitted through
    # pipes, behind the scenes.
    pool = Pool(processes=3)
    histograms = pool.map(build_histogram, datafile_paths)

    # Properly shut down the pool of worker processes, and
    # wait until all of them have finished.
    pool.close()
    pool.join()

    # Merge sub-histograms. Do not create too many intermediate
    # objects: update the first sub-histogram with the others.
    # Relevant docs: collections.Counter.update
    merged_hist = histograms[0]
    for h in histograms[1:]:
        merged_hist.update(h)

    for word, count in merged_hist.items():
        print "%s: %s" % (word, count)


if __name__ == "__main__":
    main()

测试输出:

python countwords.py
eisenbahn: 12
auto: 6
cat: 1
katze: 10
stadt: 1
wolf: 3
zug: 4
blume: 5
herbert: 14
destruction: 4

【讨论】:

  • 是的,在这种情况下这样做是微不足道的,因为您知道可以在迭代之前拆分数据,但在一般情况下并不能让它变得更好。即,如果 fileinput.input 是单个套接字对象。
  • 你在问“问题是当一个人到达 multiprocessing.Pool 时如何处理这个问题。” --- 在那里,最重要的问题是问题的“并行”性质以及它在多大程度上可以分解为任务。并非每个基于生成器的解决方案都可以转换为标准的 mp Pool 配方!
  • 我的问题是如何在调用 pool.map 时为生成器调用函数之前和之后的函数 - 这对我 islice 文件列表没有帮助,尽管在这种情况下当然可以,也许我的例子不太好。
  • 我想每个人都想知道:生成器的“之前”和“之后”功能是什么?你真的需要更明确的措辞...... :)
  • 我知道,我想我只是不擅长提出具体问题 - 即 init 在迭代之前由池生成的进程内存中的计数器,然后返回值生成器为空后的那个计数器。
【解决方案2】:

我不得不修改原来的 pool.py(问题是 worker 被定义为一个没有任何继承的方法)以获得我想要的,但它并没有那么糟糕,并且可能比完全编写一个新池更好。

class worker(object):
    def __init__(self, inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
           wrap_exception=False, finalizer=None, finargs=()): 
        assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
        put = outqueue.put
        get = inqueue.get
        self.completed = 0
        if hasattr(inqueue, '_writer'):
            inqueue._writer.close()
            outqueue._reader.close()
        if initializer is not None:
            initializer(self, *initargs)

        def run(self): 
            while maxtasks is None or (maxtasks and self.completed < maxtasks):
                try:
                    task = get()
                except (EOFError, OSError):
                    util.debug('worker got EOFError or OSError -- exiting')
                    break

                if task is None:
                    util.debug('worker got sentinel -- exiting')
                    break

                job, i, func, args, kwds = task
                try:
                    result = (True, func(*args, **kwds))
                except Exception as e:
                    if wrap_exception:
                        e = ExceptionWithTraceback(e, e.__traceback__)
                    result = (False, e)
                try:
                    put((job, i, result))
                except Exception as e:
                    wrapped = MaybeEncodingError(e, result[1])
                    util.debug("Possible encoding error while sending result: %s" % (
                        wrapped))
                    put((job, i, (False, wrapped)))
                self.completed += 1
            if finalizer:
                finalizer(self, *finargs)
            util.debug('worker exiting after %d tasks' % self.completed)
        run(self)

【讨论】: