【问题标题】:How to share state when using concurrent futures使用并发期货时如何共享状态
【发布时间】:2019-05-06 18:16:35
【问题描述】:

我知道使用传统的多处理库我可以声明一个值并在进程之间共享状态。

https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#sharing-state-between-processes

当使用较新的 concurrent.futures 库时,如何在我的进程之间共享状态?

import concurrent.futures

def get_user_object(batch):
    # do some work
    counter = counter + 1
    print(counter)

def do_multithreading(batches):
    with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
        threadingResult = executor.map(get_user_object, batches)

def run():
    data_pools = get_data()
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
        processResult = executor.map(do_multithreading, data_pools)
    end = time.time()
    print("TIME TAKEN:", end - start)

if __name__ == '__main__':
    run()

我想保持这个计数器的同步值。

在之前的库中,我可能使用过multiprocessing.ValueLock

【问题讨论】:

  • 这与新的 concurrent.fututres 库特别相关

标签: python python-3.x python-multiprocessing python-multithreading


【解决方案1】:

您可以将initializerinitargs 传递给ProcessPoolExecutor,就像传递给multiprocessing.Pool 一样。这是一个例子:

import concurrent.futures
import multiprocessing as mp


def get_user_object(batch):
    with _COUNTER.get_lock():
        _COUNTER.value += 1
        print(_COUNTER.value, end=' ')


def init_globals(counter):
    global _COUNTER
    _COUNTER = counter


def main():
    counter = mp.Value('i', 0)
    with concurrent.futures.ProcessPoolExecutor(
        initializer=init_globals, initargs=(counter,)
    ) as executor:
        for _ in executor.map(get_user_object, range(10)):
            pass
    print()


if __name__ == "__main__":
    import sys
    sys.exit(main())

用途:

$ python3 glob_counter.py 
1 2 4 3 5 6 7 8 10 9 

地点:

  • for _ in executor.map(get_user_object, range(10)): 允许您遍历每个结果。在这种情况下,get_user_object() 返回None,因此您实际上没有任何事情要处理;您只需pass,无需采取进一步行动。
  • 最后一个 print() 调用为您提供了一个额外的换行符,因为最初的 print() 调用不使用换行符 (end=' '')

【讨论】:

猜你喜欢
  • 2010-10-15
  • 2012-03-11
  • 2021-12-17
  • 1970-01-01
  • 1970-01-01
  • 2017-06-08
  • 2020-08-02
  • 1970-01-01
  • 2023-03-22
相关资源
最近更新 更多