【问题标题】:Share variables in real time. Multiprocess Python实时共享变量。多进程 Python
【发布时间】:2021-10-11 09:53:32
【问题描述】:

我有 2 个进程:进程 A 生成数据,进程 B 使用进程 A 的值执行数学计算。该过程通过 Python 的 Multiprocess 库进行同步,如下面的代码所示。到目前为止,我共享变量的方式是通过提供Multiprocess 库的数据类型valueArray。如果我了解这些数据类型的工作原理,则必须等待进程完成使用共享变量,因此,通过这种方式,我明白我不能立即使用进程 A 生成的新值(或数组)。

产生数据的进程A是:

def process_A(processid, textid, texttitle, values):
    
    iters = 10
    
    h1 = new_data_incoming()
    data  = h1.text.split(" ")

    p_var1, p_var2 = 0, 0
    i = 0
    while i < iters:
        
        a_var1 = float(data[1].replace(',',''))
        a_var2 = float(data[1].replace(',',''))
        if a_var1 != p_var1:
            values[0] = a_var1
            values[1] = a_var2
            values[2] = float(data[3].replace(',',''))
            values[3] = float(data[4].replace(',',''))
            
            if data[5][0] == "+":
                values[4] = float(data[5][1::])
            else:
                values[4] = -float(data[5][1::])
                
            if data[6][0] == "+":
                values[5] = float(data[6][1:-1])
            else:
                values[5] = -float(data[6][1:-1])
            
            p_var1 = a_var1
            p_var2 = a_var2
            i = i + 1

控制进程A启动并接收数据的进程B是:

def process_B():
    ids = getids()
    
    key2 = "stocastic_process"
    
    id2  = ids[key2]
    
    values = Array('d', range(6))
    
    process_A_launched = Process(target=process_A, args=(2, id2, key2, values2))
    
    process_A_launched.start()
    print("Process with id %i has started: stocastic %s" % (2, key2))
    
    process_A_launched.join(timeout=0)
    
    finished = False
    fnsh = False
    
    while finished == False:
            
        if not process_A_launched.is_alive() and fnsh == False:
            print("process %i has finished!" % (2))
            fnsh = True
            
        if fnsh == True:
            finished = True
    
    print("All process have finished!")

    do_somethig_with_values(values)

所以,我想尽快使用生成的新数据。总是认为进程 A 生成了一个新数据,在进程 B 中使用它,因为我的意图是进程 A 一直在运行。也许我需要使用管道?

在 C 中,我在 POSIX 中使用过管道和中断,但在 Python 中我不知道这一点。

谢谢!

【问题讨论】:

  • 如何使用阻塞队列让 A 将数据和多个消费者 B 放在队列上以使用值。如果您想始终使用最新值,那么您可以使用 Stack 以类似方式代替 Queue。

标签: python multiprocessing pipe


【解决方案1】:

使用阻塞互斥体 (Lock()) 自己处理线程安全或使用队列,具体取决于您必须发送多少数据。

使用互斥锁

from multiprocessing import Lock
from time import sleep
from collections import dequeue
data = dequeue()
lock = Lock()

def producer():
    with lock:
        data.append(stuff)
    ...
    with lock:
        data.append(stuff)

def launcher_consumer():
    start_producer()
    while not len(data):
        sleep(0.001)
    with lock:
        do_stuff(data.pop_left())

然而,到此为止……我们几乎重新实现了一个队列。如果我有一个(或两个)位数据要发送,我只会使用互斥锁。 (在第一种情况下,我不会使用出队,只是一个普通的 var。)

使用队列

from multiprocessing import Queue

data = Queue()

def producer():
    data.put(stuff)
    ...
    data.put(stuff)

def launcher_consumer():
    start_producer()
    val = data.get(timeout=None) # set timeout if you need it
    do_stuff(val)
    val = data.get()
    do_stuff(val)

有关任一解决方案的详细信息,请参阅多处理文档。

参考

https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes

【讨论】:

    猜你喜欢
    • 2021-09-23
    • 1970-01-01
    • 1970-01-01
    • 2020-06-12
    • 2017-09-10
    • 2019-05-28
    • 2018-06-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多