您可以利用多处理在子进程内运行模拟,并利用 copy-on-write benefits of forking 在开始时仅取消/处理一次数据:
import multiprocessing
import pickle
# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork')
# Load data once, in the parent process
data = pickle.load(open(DATA_ROOT + pickle_name, 'rb'))
def _run_simulation(_):
# Wrapper for `run_simulation` that takes one argument. The function passed
# into `multiprocessing.Pool.map` must take one argument.
run_simulation()
with mp.Pool() as pool:
pool.map(_run_simulation, range(num_simulations))
如果你想参数化每个模拟运行,你可以这样做:
import multiprocessing
import pickle
# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork')
# Load data once, in the parent process
data = pickle.load(open(DATA_ROOT + pickle_name, 'rb'))
with mp.Pool() as pool:
simulations = ('arg for simulation run', 'arg for another simulation run')
pool.map(run_simulation, simulations)
这样run_simulation 函数将从simulations 元组中传递值,这可以允许每个模拟运行使用不同的参数,或者甚至只是为每个运行分配一个名称的ID 号用于记录/保存目的。
整个方法依赖于可用的分叉。有关将 fork 与 Python 的内置多处理库一起使用的更多信息,请参阅the docs about contexts and start methods。由于文档中描述的原因,您可能还需要考虑使用forkserver 多处理上下文(通过使用mp = multiprocessing.get_context('fork'))。
如果您不想并行运行模拟,则可以采用这种方法。关键是,为了只处理一次数据,您必须在处理数据的进程或其子进程之一中调用run_simulation。
例如,如果您想编辑 run_simulation 所做的事情,然后在您的命令下再次运行它,您可以使用类似以下的代码来完成:
main.py:
import multiprocessing
from multiprocessing.connection import Connection
import pickle
from data import load_data
# Load/process data in the parent process
load_data()
# Now child processes can access the data nearly instantaneously
# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork') # Consider using 'forkserver' instead
# This is only ever run in child processes
def load_and_run_simulation(result_pipe: Connection) -> None:
# Import `run_simulation` here to allow it to change between runs
from simulation import run_simulation
# Ensure that simulation has not been imported in the parent process, as if
# so, it will be available in the child process just like the data!
try:
run_simulation()
except Exception as ex:
# Send the exception to the parent process
result_pipe.send(ex)
else:
# Send this because the parent is waiting for a response
result_pipe.send(None)
def run_simulation_in_child_process() -> None:
result_pipe_output, result_pipe_input = mp.Pipe(duplex=False)
proc = mp.Process(
target=load_and_run_simulation,
args=(result_pipe_input,)
)
print('Starting simulation')
proc.start()
try:
# The `recv` below will wait until the child process sends sometime, or
# will raise `EOFError` if the child process crashes suddenly without
# sending an exception (e.g. if a segfault occurs)
result = result_pipe_output.recv()
if isinstance(result, Exception):
raise result # raise exceptions from the child process
proc.join()
except KeyboardInterrupt:
print("Caught 'KeyboardInterrupt'; terminating simulation")
proc.terminate()
print('Simulation finished')
if __name__ == '__main__':
while True:
choice = input('\n'.join((
'What would you like to do?',
'1) Run simulation',
'2) Exit\n',
)))
if choice.strip() == '1':
run_simulation_in_child_process()
elif choice.strip() == '2':
exit()
else:
print(f'Invalid option: {choice!r}')
data.py:
from functools import lru_cache
# <obtain 'DATA_ROOT' and 'pickle_name' here>
@lru_cache
def load_data():
with open(DATA_ROOT + pickle_name, 'rb') as f:
return pickle.load(f)
simulation.py:
from data import load_data
# This call will complete almost instantaneously if `main.py` has been run
data = load_data()
def run_simulation():
# Run the simulation using the data, which will already be loaded if this
# is run from `main.py`.
# Anything printed here will appear in the output of the parent process.
# Exceptions raised here will be caught/handled by the parent process.
...
上面详述的三个文件都应该在同一个目录中,旁边还有一个可以为空的__init__.py 文件。 main.py 文件可以重命名为您想要的任何名称,并且是该程序的主要入口点。您可以直接运行simulation.py,但这会导致加载/处理数据花费很长时间,这是您最初遇到的问题。在运行main.py 时,可以编辑文件simulation.py,因为每次从main.py 运行模拟时都会重新加载它。
对于 macOS 用户:在 macOS 上分叉可能有点错误,这就是为什么 Python 默认使用 spawn 方法在 macOS 上进行多处理,但仍然支持 fork 和 forkserver。如果您遇到崩溃或与多处理相关的问题,请尝试将 OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES 添加到您的环境中。详情请见https://stackoverflow.com/a/52230415/5946921。