【问题标题】:How can I iterate a SimPy simulation in parallel?如何并行迭代 SimPy 模拟?
【发布时间】:2019-10-02 13:54:21
【问题描述】:

我有一个 SimPy 模型,它返回一个我想多次复制的随机结果。每个复制都是独立的,因此为了使其更快,我想并行运行它们。我已经尝试过 Python 的 multiprocessingPathos multiprocessing 和 joblib Parallel,但每种方法都会出现相同的错误:TypeError: can't pickle generator objects。有什么办法可以避免这个错误并并行运行模拟?

SimPy 依赖于生成器,如 here 所述,因此无法避免使用它们。

【问题讨论】:

    标签: python parallel-processing simpy


    【解决方案1】:

    错误很好地描述了问题。在您发送给子进程的对象中的某个地方,一个生成器潜伏着,大概在函数参数中。是否可以将此生成器转换为列表?

    例如,以下会引发您提到的错误:

    from multiprocessing import Pool
    
    def firstn(n):
        k = 0
        while k < n:
            yield k
            k += 1
    
    if __name__ == "__main__":
        p = Pool(2)
        print(p.map(firstn, [1, 2, 3, 4]))
    

    但这一个有效:

    from multiprocessing import Pool
    
    def firstn(n):
        k = 0
        while k < n:
            yield k
            k += 1
    
    def wrapped(n):
        return list(firstn(n))
    
    if __name__ == "__main__":
        p = Pool(2)
        print(p.map(wrapped, [1, 2, 3, 4]))
    

    【讨论】:

    • 在 SimPy 中,每个进程都是生成模拟事件的生成器,因此它们实际上是不可能避免的。
    • 我明白了。好吧,可能仍然有聪明的方法可以绕过它们。我用一个例子编辑了我的答案。
    • 我有这样的想法,但我无法将它拼凑起来。我试试看。
    • 当我并行运行独立函数时,您的示例工作正常,但当我尝试使用类方法时遇到同样的错误。有什么建议吗?
    • 技术上的答案是因为它们不能被腌制。有关可以腌制的内容及其原因的详细说明,请参阅docs.python.org/3/library/…。类方法与实例状态交互,因此如果它们可以并行执行,结果可能是不可预测的。
    【解决方案2】:

    您需要在新进程中从头开始实例化环境,并注意仅使用 vanilla 类型作为要在 Pool 中映射的参数。这是一个重新设计的洗车示例(来自simpy 文档),它使用不同的种子运行 4 次并行模拟,并打印每种情况下洗了多少辆汽车。

    import multiprocessing as mp
    import simpy
    import random
    
    
    NUM_MACHINES = 2  # Number of machines in the carwash
    WASHTIME = 5      # Minutes it takes to clean a car
    T_INTER = 7       # Create a car every ~7 minutes
    SIM_TIME = 20     # Simulation time in minutes
    
    
    class Carwash(object):
        """A carwash has a limited number of machines (``NUM_MACHINES``) to
        clean cars in parallel.
    
        Cars have to request one of the machines. When they got one, they
        can start the washing processes and wait for it to finish (which
        takes ``washtime`` minutes).
    
        """
        def __init__(self, env, num_machines, washtime):
            self.env = env
            self.machine = simpy.Resource(env, num_machines)
            self.washtime = washtime
    
        def wash(self, car):
            """The washing processes. It takes a ``car`` processes and tries
            to clean it."""
            yield self.env.timeout(WASHTIME)
    
    
    def car(env, name, cw):
        """The car process (each car has a ``name``) arrives at the carwash
        (``cw``) and requests a cleaning machine.
    
        It then starts the washing process, waits for it to finish and
        leaves to never come back ...
    
        """
        with cw.machine.request() as request:
            yield request
            yield env.process(cw.wash(name))
    
    
    def setup(env, num_machines, washtime, t_inter):
        """Create a carwash, a number of initial cars and keep creating cars
        approx. every ``t_inter`` minutes."""
        # Create the carwash
        carwash = Carwash(env, num_machines, washtime)
    
        # Create 4 initial cars
        for i in range(4):
            env.process(car(env, 'Car %d' % i, carwash))
    
        # Create more cars while the simulation is running
        while True:
            yield env.timeout(random.randint(t_inter - 5, t_inter + 5))
            i += 1
            env.i = i
            env.process(car(env, 'Car %d' % i, carwash))
    
    
    # additional wrapping function to be executed by the pool
    def do_simulation_with_seed(rs):
    
        random.seed(rs)  # This influences only the specific process being run
        env = simpy.Environment()  # THE ENVIRONMENT IS CREATED HERE, IN THE CHILD PROCESS
        env.process(setup(env, NUM_MACHINES, WASHTIME, T_INTER))
    
        env.run(until=SIM_TIME)
    
        return env.i
    
    
    if __name__ == '__main__':
        seeds = range(4)
        carwash_pool = mp.Pool(4)
        ncars_by_seed = carwash_pool.map(do_simulation_with_seed, seeds)
        for s, ncars in zip(seeds, ncars_by_seed):
            print('seed={} --> {} cars washed'.format(s, ncars))
    
    

    【讨论】:

    • 不能轻松做的是用 deepcopy 或 pickle 在飞行中分叉一个现有的环境,目的是调整其参数并在不重新运行所有内容的情况下执行不同的场景,理想情况下在平行下。见这里:stackoverflow.com/questions/58415397/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-20
    • 2017-07-12
    相关资源
    最近更新 更多