【问题标题】:pytransitions, timeout, in-memory, persisting to databasepytransitions,超时,内存,持久化到数据库
【发布时间】:2021-04-07 13:07:27
【问题描述】:

关于我目前在我的一个项目中使用的 pyTransitions 包的问题。

我在一开始就在评估不同的软件包时进行了测试,其中还包括我知道将来某个地方需要的超时功能。

然后我逐渐选择使用 sqlalchemy 将我所有的有限状态机(实际上只是一个 id 和状态)持久化到磁盘上,并且只在需要触发转换时才重新加载它们——效果很好。

不幸的是,现在再次需要超时处理,甚至在尝试将其集成到我的代码中之前,我很确定这对我不起作用。 我想我说的很明显:为了在一组(可能更大的)fsms 上正确处理超时,它们必须作为活对象存在于内存中,而不仅仅是从数据库中加载?

这是您已经遇到的用例吗?是否有某种方法也可以访问此超时计数器,以便在任何时候通过适当的修复来持久化和重新加载它,以允许超时机制恢复正常,即使对象在 RAM 中没有存在时也是如此实际超时?

如果没有简单的内置替代方案,我想我会在 RAM 中创建一个常驻对象池,定期保存它们并在我的应用程序出现故障时重新加载它们? (我的具体场景使用 sqlalchemy,但我想这同样适用于泡菜)

为任何想法或建议提前欢呼和感谢 乔尔

【问题讨论】:

    标签: pytransitions


    【解决方案1】:

    目前还没有恢复超时的内置功能。但是,有一些方法可以将机器从(字典)配置转换为(字典)配置。该扩展名为MarkupMachine,并在FAQ notebook 中提及。

    我们需要的是一个 Timeout 状态类,它存储有关何时触发的信息,并且可以根据此信息恢复。我们还需要MarkupMachine 来处理这些自定义状态信息。 MarkupMachine._convert_models 将模型及其当前状态转换为字典,MarkupMachine._add_markup_model 将获取字典以再次实例化模型。因此,我们需要扩展这两种方法。

    我会偷工减料以保持代码简短并专注于概念。不过,没有任何假设是强制性的。我将假设a)您可以处理配置,因为您可以调整它们以从数据库中存储和检索。此外,我将假设 b)您的机器是您的有状态模型,c)您使用默认的 model_attribute'state',d)您不使用嵌套/分层机器和 e)您不传递重要的自定义信息当你触发一个事件时。最后,f) 你不介意进入恢复状态并且潜在的on_enter_<callbacks> 将被触发,并且 g) 你不需要毫秒(分数)维度的精度。这听起来很多。但同样,这一切都不会破坏交易,只是需要更复杂的案例处理。

    from transitions.extensions.markup import MarkupMachine
    from transitions.extensions.states import Timeout
    from transitions.core import EventData, Event
    import time
    from datetime import datetime
    
    
    class MarkupTimeout(Timeout):
    
        def __init__(self, *args, **kwargs):
            # Timeout expects a number but MarkupMachine passes values to states as strings right now
            kwargs['timeout'] = int(kwargs.get('timeout', 0))
            super(MarkupTimeout, self).__init__(*args, **kwargs)
            # we store trigger times in a dictionary with the model id as keys
            self.timeout_at = {}
            self.timeout = int(self.timeout)
    
        def resume(self, timeout_at, event_data):
            # since we want to give our MarkupMachine some time to instantiate we postulate that
            # the earliest possible trigger time is in a second.
            trigger_time = time.time() + 1
            timeout_at = trigger_time if timeout_at < trigger_time else timeout_at
            # we store the default timeout time ...
            tmp = self.timeout
            # ... and temporary override it with the delta of the intended trigger time and the current time
            self.timeout = timeout_at - time.time()
            # ... enter the state and trigger the creation of the timer
            self.enter(event_data)
            # restore the timeout for any future enter event
            self.timeout = tmp
    
        def enter(self, event_data):
            # a timeout will only be initiated if the timeout value is greater than 0
            if self.timeout > 0:
                # calculate the time when the timeout will trigger (approximately) ...
                timeout_time = time.time() + self.timeout
                # and store it in the previously created dictionary
                self.timeout_at[id(event_data.model)] = timeout_time
                print(f"I should timeout at: {datetime.utcfromtimestamp(timeout_time)}")
            super(MarkupTimeout, self).enter(event_data)
    
        def exit(self, event_data):
            super(MarkupTimeout, self).exit(event_data)
            # remove the timeout time when the state is exited
            self.timeout_at[id(event_data.model)] = None
    
    
    class DBMachine(MarkupMachine):
    
        # DBMachine will use this class when states are created
        state_cls = MarkupTimeout
    
        # we customize our model definition and add 'timeout_at' to it
        # usually MarkupMachine would iterate over all models but since we assume the model is just
        # the machine itself, we can skip that part
        def _convert_models(self):
            state = self.get_state(self.state)
            timeout_at = state.timeout_at.get(id(self), None)
            model_def = {'state': state.name,
                         'name': 'DBMachine',
                         'class-name': 'self',
                         'timeout_at': str(timeout_at) if timeout_at is not None else ''}
            return [model_def]
    
        def _add_markup_model(self, markup):
            initial = markup.get('state', None)
            timeout_at = markup.get('timeout_at', '')
            self.add_model(self, initial)
            if timeout_at:
                state = self.get_state(self.state)
                # as mentioned above, every configuration value is a string right now
                ms = float(timeout_at)
                # since we did not store event data, we need to create a temporary event with a minimal EventData object
                # that can be passed to state callbacks
                state.resume(ms, EventData(state=state,
                                           event=Event(name="resume", machine=self),
                                           machine=self,
                                           model=self,
                                           args=[],
                                           kwargs={}))
    
    
    # we pass a timeout only for 'pending'
    states = ['init', dict(name='pending', timeout=5, on_timeout='cancel'), 'done', 'cancelled']
    transitions = [
        dict(trigger='request', source='init', dest='pending'),
        dict(trigger='cancel', source='pending', dest='cancelled'),
        dict(trigger='result', source='pending', dest='done')
    ]
    
    m = DBMachine(states=states, transitions=transitions, initial='init')
    # transition to 'pending' and initiate timer
    m.request()
    assert m.is_pending()
    config = m.markup  # [1]
    # remove old machine
    del m
    # create new machine from configuration
    m2 = DBMachine(markup=config)
    assert m2.is_pending()
    time.sleep(10)
    assert m2.is_cancelled()
    

    配置 [1] 如下所示:

    { 'after_state_change': [],
      'auto_transitions': True,
      'before_state_change': [],
      'finalize_event': [],
      'ignore_invalid_triggers': None,
      'initial': 'init',
      'models': [ { 'class-name': 'self',
                    'name': 'DBMachine',
                    'state': 'pending',
                    'timeout_at': '1617958918.6320097'}],
      'prepare_event': [],
      'queued': False,
      'send_event': False,
      'states': [ {'name': 'init'},
                  {'name': 'pending', 'on_timeout': ['cancel'], 'timeout': '5'},
                  {'name': 'done'},
                  {'name': 'cancelled'}],
      'transitions': [ {'dest': 'pending', 'source': 'init', 'trigger': 'request'},
                       { 'dest': 'cancelled',
                         'source': 'pending',
                         'trigger': 'cancel'},
                       {'dest': 'done', 'source': 'pending', 'trigger': 'result'}]}
    
    

    我假设可以重新组织此配置以使 SQL 查询能够过滤即将发生的超时并在必要时实例化机器。 timeout_at 也可以存储为日期时间字符串而不是 unix 时间戳,如果这样可以使查询更容易。您也可以只存储 models 部分,而不是从配置中创建 DBMachine,而是以“通用”方式创建它:

    # reuse the states and transitions and only create the model from configuration
    # 'model=None' prevents the machine from adding itself as a model too early
    m2 = DBMachine(model=None, states=states, transitions=transitions, initial='init')
    m2._add_markup_model(config['models'][0])
    

    【讨论】:

      猜你喜欢
      • 2015-03-25
      • 1970-01-01
      • 2022-01-19
      • 2014-04-25
      • 1970-01-01
      • 1970-01-01
      • 2016-05-12
      • 1970-01-01
      • 2019-09-27
      相关资源
      最近更新 更多