【问题标题】:How to communicate between processes while they're running如何在进程运行时在进程之间进行通信
【发布时间】:2018-01-30 02:08:20
【问题描述】:

我想要两个并行运行的进程。一个从另一个获取输入,处理数据并将处理后的数据作为输出发送回另一个。另一个进程做同样的事情。显然需要有起点和终点。

如何在进程运行时在它们之间进行通信?我只设法依次运行这两个进程。

我尝试用multiprocessing解决它:

from multiprocessing import Process, Queue, Array
sentinel = -1

def process1(q, arr):
    # Receives data, modifies it and sends it back
    while True:
        data = q.get() # Receive data

        if data is sentinel:
            break
    data *= 2 # Data modification
    arr.append(data) # Data logging
    q.put(data) # Send data
    q.put(sentinel) # Send end signal

def process2(q, arr):
    # Receives data, increments it and sends data back
    if q.empty():
        data = 1
    else:
        while True:
            data = q.get() # Receive data
            if data == sentinel:
                break

    data += 1
    q.put(data) # Send data
    arr.append(data) # Data logging
    q.put(sentinel) # Send end signal

if __name__ == "__main__":
    q = Queue()
    logbook = Array('i', 0)
    counter = 0
    while counter < 10:
        process_1 = Process(target=process1, args=(q, logbook))
        process_2 = Process(target=process2, args=(q, logbook))
        process_1.start()
        process_2.start()
        q.close()
        q.join_thread()
        process_1.join()
        process_2.join()
        counter += 1
    print(logbook)

【问题讨论】:

标签: python python-3.x multiprocessing


【解决方案1】:

我试图了解您的需求,但我并不完全清楚,因此我提出了这个生产者-消费者版本的代码,其中两个进程进行通信以达到一定数量的迭代的最终结果。

首先,您需要两个队列,以避免将内容放入队列的同一进程在另一个进程之前读取它。 其次,您需要一种机制来就计算结束达成一致,在本例中为 None 消息。

我提出的解决方案总结为以下代码:

from multiprocessing import Process, Queue, Array

def process1(in_queue, out_queue):
    # Receives data, modifies it and sends it back
    while True:
        data = in_queue.get() # Receive data
        if data is None:
            out_queue.put(None)  # send feedback about END message
            out_queue.close()
            out_queue.join_thread()
            break

        data *= 2 # Data modification
        out_queue.put(data) # Send data

def process2(in_queue, out_queue, how_many):
    data = 0

    # Receives data, increments it and sends data back
    while how_many > 0:
        data += 1 # Data modification
        out_queue.put(data) # Send data
        how_many -= 1

        data = in_queue.get() # Receive data
        if data is None:
            break

    # send END message
    out_queue.put(None)
    out_queue.close()
    out_queue.join_thread()
    assert in_queue.get() is None


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()

    process_1 = Process(target=process1, args=(q1, q2))
    process_2 = Process(target=process2, args=(q2, q1, 10))
    process_1.start()
    process_2.start()

    process_2.join()
    process_1.join()
    q1.close()
    q2.close()
    q1.join_thread()
    q2.join_thread()

【讨论】:

  • 这个解决方案涵盖了我打算做的事情!我没有考虑为此目的使用两个队列。非常感谢。
  • 能否简单解释一下,为什么process1可以在发送结束消息之前退出?是否可以通过使用关键字 finally 来解决,即使发生错误也发送结束消息?
  • @ceske 我解决了这个问题,确保在out_queue 上调用closejoin。问题可能是进程退出,而内部线程正在将 None 消息写入队列。确保加入内部线程不会再发生这种行为。
  • 很高兴为您提供帮助,欢迎来到 Stack Overflow。如果此答案或任何其他答案解决了您的问题,请将其标记为已接受。
  • 完成!使用您的解决方案,我什至可以使用第三个队列记录两个进程之间的通信。
【解决方案2】:

您可以使用套接字来完成这项任务,甚至可以使用微服务方法(例如通过 rest api 调用)。

【讨论】:

  • 您能否进一步解释一下您所说的微服务方法是什么意思?我听说过 REST,现在我正试图弄清楚如何在 Python 中实现这种范例。
  • 例如,就像一个网络服务。您提供对模块内服务(函数、方法)的访问。该模块可以通过 REST API 访问,例如使用自上而下的方法作为 OpenApi 规范 (en.wikipedia.org/wiki/OpenAPI_Specification)。
【解决方案3】:

@罗伯托·特拉尼

从您的解决方案开始,我什至能够使用第三个队列记录两个进程之间正在进行的通信。

谢谢,我真的被困住了,不知道如何解决这个问题。

from multiprocessing import Process, Queue

def process1(in_queue, out_queue, log_queue):
    # Receives data, modifies it and sends it back
    while True:
        data = in_queue.get() # Receive data
        if data is None:
            log_queue.put(None) # log END
            out_queue.put(None)  # send feedback about END message
            break

        data *= 2 # Data modification
        print("p1: {}".format(data))
        log_queue.put(data) # Log data
        out_queue.put(data) # Send data

def process2(in_queue, out_queue, how_many, log_queue):
    data = 0

    # Receives data, increments it and sends data back
    while how_many > 0:
        data += 1 # Data modification
        print("p2: {}".format(data))
        log_queue.put(data) # Log Data
        out_queue.put(data) # Send data
        how_many -= 1

        data = in_queue.get() # Receive data
        if data is None:
            break

    # send END message
    log_queue.put(None) # log END
    out_queue.put(None)
    out_queue.close()
    out_queue.join_thread()
    assert in_queue.get() is None


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    logbook = []

    process_1 = Process(target=process1, args=(q1, q2, q3))
    process_2 = Process(target=process2, args=(q2, q1, 10, q3))
    process_1.start()
    process_2.start()

    process_2.join()
    process_1.join()
    q1.close()
    q2.close()
    q1.join_thread()
    q2.join_thread()

    while True:
        data = q3.get()
        logbook.append(data)
        if data is None:
            break

    q3.close()
    q3.join_thread()

    print(logbook)

【讨论】:

    【解决方案4】:

    您能否进一步解释一下您所说的微服务方法是什么意思?我听说过 REST,现在我正试图弄清楚如何在 Python 中实现这个范例。 ——

    例如,就像一个网络服务。您提供对模块内服务(函数、方法)的访问。该模块可以通过 REST API 访问,例如使用自上而下的方法作为 OpenApi 规范 (https://en.wikipedia.org/wiki/OpenAPI_Specification)。

    我目前正在使用这种方法:设计一个高级接口(模块,每个模块的功能,层次结构和模块的互连);在 openapi 编辑器(在线:https://editor.swagger.io)中的 yaml/json 文件中使用 CRUD(https://en.wikipedia.org/wiki/Create,_read,_update_and_delete)写下该设计以满足 REST 端点;使用编辑器功能生成 python 代码(flask);编辑样板代码以实际实现后端功能;运行服务器以提供对 API 方法的访问。 您甚至可以将模块转换为 docker 镜像以实现可扩展性:我正在使用此基础镜像:https://github.com/tiangolo/uwsgi-nginx-flask-docker/

    【讨论】:

    • 感谢您的解释。我可能想稍后在项目中回到这种方法。现在我的印象是这对我目前的需求来说太多了。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-06-19
    • 1970-01-01
    • 2017-03-19
    • 1970-01-01
    • 2013-04-28
    • 2015-08-04
    • 1970-01-01
    相关资源
    最近更新 更多