【问题标题】:Python Multi-Processing Question?Python多处理问题?
【发布时间】:2013-06-17 02:54:23
【问题描述】:

我有一个包含 500 个输入文件的文件夹(所有文件的总大小约为 500[MB])。

我想编写一个 python 脚本,它执行以下操作:

(1)将所有输入文件加载到内存中

(2) 初始化一个空的 python 列表,稍后将使用该列表...参见项目符号 (4)

(3) 启动 15 个不同的(独立的)进程:每个进程都使用相同的输入数据 [来自(1)]——但使用不同的算法对其进行处理,从而产生不同的结果

(4) 我希望所有独立进程 [从步骤 (3)] 将它们的输出存储在同一个 python 列表中 [在步骤 (2) 中初始化的同一个列表]

一旦所有 15 个进程都完成运行,我将拥有包含所有 15 个独立进程的结果的 one python list

我的问题是,是否可以在python 中有效地完成上述操作?如果是这样,您能否提供一个方案/示例代码来说明如何做到这一点?

注意 #1: 我将在强大的多核服务器上运行它;所以这里的目标是使用所有的处理能力,同时在所有独立进程之间共享一些内存 {input data, output list}。

注意 #2:我在 Linux 环境中工作

【问题讨论】:

  • 这根本无法正常工作。由于对一个列表的所有争论,您可能不会从并发中看到太多好处。
  • @Rafe Kettler:你有什么建议?我的目标是能够在处理结束时“汇总”结果;有什么选择? (为了清楚起见,output list 的大小约为 100[MB])。
  • 我可以看到它与 zeromq 一起工作。多个发布者,一个订阅者。订阅者管理列表。所以我的问题是#1:第三部分库在桌面上吗?和 #2:如何表示输出?
  • 我会在独立列表上工作,然后立即将它们汇总。显然,这种方法会使用更多的内存,所以你可能不得不去磁盘一些。
  • @user3262424 如您所见,我将一个演示应用程序放在一起演示如何使用 0mq 做类似的事情。我认为问题的症结在于进程之间的通信。 zeromq 使这比多处理 IMO 提供的机制容易得多。它为您的特定问题增加的价值是一个订阅者与多个发布者进行通信。无论如何希望有帮助

标签: python zeromq


【解决方案1】:

好的,我刚刚使用zeromq 进行了此操作,以向多个发布者展示单个订阅者。您可能可以对队列做同样的事情,但您需要对它们进行更多管理。 zeromq 套接字可以正常工作,这对于 IMO 之类的东西来说非常有用。

"""
demo of multiple processes doing processing and publishing the results
to a common subscriber
"""
from multiprocessing import Process


class Worker(Process):
    def __init__(self, filename, bind):
        self._filename = filename
        self._bind = bind
        super(Worker, self).__init__()

    def run(self):
        import zmq
        import time
        ctx = zmq.Context()
        result_publisher = ctx.socket(zmq.PUB)
        result_publisher.bind(self._bind)
        time.sleep(1)
        with open(self._filename) as my_input:
            for l in my_input.readlines():
                result_publisher.send(l)

if __name__ == '__main__':
    import sys
    import os
    import zmq

    #assume every argument but the first is a file to be processed
    files = sys.argv[1:]

    # create a worker for each file to be processed if it exists pass
    # in a bind argument instructing the socket to communicate via ipc
    workers = [Worker(f, "ipc://%s_%s" % (f, i)) for i, f \
               in enumerate((x for x in files if os.path.exists(x)))]

    # create subscriber socket
    ctx = zmq.Context()

    result_subscriber = ctx.socket(zmq.SUB)
    result_subscriber.setsockopt(zmq.SUBSCRIBE, "")

    # wire up subscriber to whatever the worker is bound to 
    for w in workers:
        print w._bind
        result_subscriber.connect(w._bind)

    # start workers
    for w in workers:
        print "starting workers..."
        w.start()

    result = []

    # read from the subscriber and add it to the result list as long
    # as at least one worker is alive
    while [w for w in workers if w.is_alive()]:
        result.append(result_subscriber.recv())
    else:
        # output the result
        print result

哦,只是为了得到 zmq

$ pip install pyzmq-static

【讨论】:

  • @Tom Willis:谢谢!上面的代码会使用服务器的所有处理能力吗?另外,您能否确认存储该输出的list 将在内存中进行操作,而无需访问磁盘?另外,当我运行 linux 命令top 时——我会看到 1 个python 进程还是 15 个不同的进程?
  • 您将在顶部看到您运行的每个工作人员 + 主进程的进程,在上面的代码中,该列表位于内存中,这是好是坏取决于您的目的。上面的脚本唯一一次访问磁盘是读取作为参数传入的文件。我只是想演示如何完成进程间通信以及使用 0mq 是多么容易。
  • 最后一件事可能并不明显。进程之间的消息必须是字符串。如果您需要更多的数据结构,请使用 json 之类的东西
  • +1 最初的请求并不像写的那样真正可行,但正确的做法是使用多个独立的 Python 进程,而 zmq 是将它们捆绑在一起的好方法。如果您的多核机器不够用,那么只需将套接字更改为使用 tcp://my_ip_address:10001,您就可以让它们在多台机器上运行而没有问题。问题中的错误是首先将所有数据加载到内存中,然后启动进程。将其保留在文件中,然后通过 PUB-SUB 队列将答案发送给主进程。
  • @user 非常欢迎您。 zeromq 真的很酷。如果您的问题可以表示为它支持的网络拓扑的混合,它会使进程间通信非常容易。花一个周末玩弄它,跟着文档一起学习,你可能会发现它为解决​​涉及多个进程的问题打开了一个充满可能性的世界。
猜你喜欢
  • 2016-08-28
  • 2019-02-07
  • 2020-02-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-10-19
相关资源
最近更新 更多