【问题标题】:Simpy Store Batch ProcessingSimpy 商店批处理
【发布时间】:2016-07-01 17:27:31
【问题描述】:

我正在尝试创建一个生产者/消费者模拟,消费者在其中批量处理项目。问题是 Store.get() 函数一被调用就会从 Store 中删除项目,但我需要它等到我调用了 yield:

import simpy

def producer(env, Q):
    item = 0
    while True:
        yield Q.put(item)
        print('Submit item:%d'%item)
        item += 1

def consumer(env, Q):
    while True:
        yield env.timeout(20)
        events = [Q.get() for i in range(4)]
        items = yield env.all_of(events)
        print([items for items in items.values()])


env = simpy.Environment()
maxQD = 2
Q = simpy.Store(env, capacity=maxQD)

env.process(producer(env, Q))
env.process(consumer(env, Q))
env.run(until=500)

产生以下输出:

Submit item:0
Submit item:1
Submit item:2
Submit item:3
Submit item:4
[0, 1, 2, 3]
Submit item:5
Submit item:6
Submit item:7
Submit item:8
[4, 5, 6, 7]
Submit item:9
Submit item:10
Submit item:11
Submit item:12
[8, 9, 10, 11]
...

当 maxQD 设置为 2 时,我预计会:

Submit item:0
Submit item:1

消费者一直阻塞,直到成功获得 4 个项目,而生产者无法添加超过 2 个。

你可以通过检查 len(Q.items) 来解决这个问题:

import simpy

def producer(env, Q):
    item = 0
    while True:
        yield Q.put(item)
        print('Submit item:%d'%item)
        item += 1

def consumer(env, Q):
    while True:
        yield env.timeout(20)
        if len(Q.items) >= 4:
            events = [Q.get() for i in range(4)]
            items = yield env.all_of(events)
            print([items for items in items.values()])


env = simpy.Environment()
maxQD = 4
Q = simpy.Store(env, capacity=maxQD)

env.process(producer(env, Q))
env.process(consumer(env, Q))
env.run(until=500)

但是你仍然会得到令人沮丧的行为,即 get() 在 yield 之前删除项目,这使得它看起来像 5 个项目被添加到 Q(注意 maxQD 更改为 4):

Submit item:0
Submit item:1
Submit item:2
Submit item:3
Submit item:4
[0, 1, 2, 3]

【问题讨论】:

  • 您观察到的是预期的行为。 env.all_of() 不会等到所有 get 事件可以成功,而是等到它们都成功。您传递给 env.all_of() 的每个单独事件都是独立的,并将尽快触发。所以这个例子的语义不是“等到商店里有 4 件商品”,而是“等到你最终从商店里拿出 4 件商品”。

标签: simpy


【解决方案1】:

通过子类化Store解决:

class BatchGet(simpy.resources.base.Get):
    def __init__(self, resource, count):
        self.count = count
        super().__init__(resource)

class BatchStore(simpy.resources.store.Store):
    get = simpy.core.BoundClass(BatchGet)

    def _do_put(self, event):
        if len(self.items) + len(event.item) <= self._capacity:
            self.items.extend(event.item)
            event.succeed()   

    def _do_get(self, event):
        count = event.count
        if len(self.items) >= count:
            ret = self.items[:count]
            self.items = self.items[count:]
            event.succeed(ret)

Puts 必须接受一个列表(因为它是一批项目),并返回一个列表。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-10-28
    • 2021-12-30
    • 1970-01-01
    • 1970-01-01
    • 2021-11-15
    • 1970-01-01
    • 2017-01-20
    • 2014-01-13
    相关资源
    最近更新 更多