【问题标题】:Pytransitions: AsyncMachine Sequential Resolution of Asynchronous Dependent CallbacksPytransitions:异步相关回调的 AsyncMachine 顺序解析
【发布时间】:2021-07-19 15:09:03
【问题描述】:

注意: 这个问题和Python的FSM库有关pytransitions

当方法回调在 prepare 或/和 before 或/和 中作为列表被提及时,我正在寻找一种按顺序解决方法回调的方法之后。我正在使用来自transitions.extensions.asyncioAsyncMachine 模块

预期结果:

1Done_2Done_3Done

获取:

None_3Done

复制当前情况的示例代码:

import asyncio
from transitions.extensions.asyncio import AsyncMachine


class Model:

    STATES = ['A', 'B']
    TRANSITIONS = [
        {'trigger': 'next', 'source': 'A', 'dest': 'B',
            'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
    ]

    def __init__(self, name, state='initial'):
        self.name = name
        self.state = state
        self.attribute_1 = None
        self.attribute_2 = None
        self.attribute_3 = None

    async def initialize1(self):
        await asyncio.sleep(1)  # This is expensive operation and will take some time.
        self.attribute_1 = '1Done'
        print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)

    async def initialize2(self):
        await asyncio.sleep(0.5)  # This is expensive operation and will take some time.
        self.attribute_2 = f'{self.attribute_1}_2Done'
        print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)

    async def initialize3(self):
        self.attribute_3 = f'{self.attribute_2}_3Done'
        print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)

    async def show_attributes(self):
        print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')


machine = AsyncMachine(
    model=None,
    states=Model.STATES,
    transitions=Model.TRANSITIONS,
    initial=None,
    queued='model'
    # queued=True
)


async def main():
    model1 = Model(name='Model1', state='A')
    machine.add_model(model1, initial=model1.state)
    await machine.dispatch('next')


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

正如代码'prepare': ['initialize1', 'initialize2', 'initialize3'] 中所示,我正在寻找一种方法,一旦解决了initialize1 就调用initialize2,一旦解决了initialize1 和initialize2 方法就调用initialize3。 目前,它们被并行调用,这是一个很好的功能,但如果有一种方法可以让它们按顺序执行/解析,那就太棒了。

当然,我可以再添加一个方法,例如initialize_all,然后在其中调用上述所有方法。但是想想我要不断添加多少新方法来处理现实世界的问题。我想让我的函数可重用且更小,仅用于特定任务。

【问题讨论】:

    标签: pytransitions


    【解决方案1】:

    我浏览了pytransitions 源代码,找到了两种方法来实现我正在寻找的功能

    如果我能提及我是如何实现我正在寻找的功能的,那就太好了。

    由于我一直在寻找一种方法来异步解析回调事件(默认情况下)和按要求顺序解析,我不得不重写 @ 的 callbacks 方法987654323@.

    方法一:

    import asyncio
    from functools import partial
    from transitions.extensions.asyncio import AsyncMachine
    
    
    class EnhancedMachine(AsyncMachine):
    
        async def callbacks(self, funcs, event_data):
            """ Overriding callbacks method:
                Get `parallel_callback` keyword argument to decide whether
                callback events should be resolved in parallel or in sequence.
            """
            parallel_callback = event_data.kwargs.get('parallel_callback', None)
            resolved_funcs = [partial(event_data.machine.callback, func, event_data) for func in funcs]
            if parallel_callback is False:
                for func in resolved_funcs:
                    await func()
            else:
                await self.await_all(resolved_funcs)
    
    
    class Model:
    
        STATES = ['A', 'B']
        TRANSITIONS = [
            {'trigger': 'next', 'source': 'A', 'dest': 'B',
                'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
    
        ]
    
        def __init__(self, name, state='initial'):
            self.name = name
            self.state = state
            self.sequential_transition = True
            self.attribute_1 = None
            self.attribute_2 = None
            self.attribute_3 = None
    
        async def initialize1(self, ed):
            await asyncio.sleep(1)  # This is expensive operation and will take some time.
            self.attribute_1 = '1Done'
            print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)
    
        async def initialize2(self, ed):
            await asyncio.sleep(0.5)  # This is expensive operation and will take some time.
            self.attribute_2 = f'{self.attribute_1}_2Done'
            print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)
    
        async def initialize3(self, ed):
            self.attribute_3 = f'{self.attribute_2}_3Done'
            print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)
    
        async def show_attributes(self, ed):
            print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')
    
    
    machine = EnhancedMachine(
        model=None,
        states=Model.STATES,
        transitions=Model.TRANSITIONS,
        initial=None,
        send_event=True,  # this will pass EventData instance for each method.
        queued='model'
        # queued=True
    )
    
    
    async def main():
        model1 = Model(name='Model1', state='A')
        machine.add_model(model1, initial=model1.state)
    
        # Passing `parallel_callback` as False for synchronous events
        await machine.dispatch('next', parallel_callback=False)
    
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
    

    缺点:

    1. 添加了send_event=True,并且所有方法定义都添加了额外的参数ed(event_data)来处理parallel_callback关键字参数。

    2. 转换回调需要传递parallel_callback=False,并且必须更改代码中所有可能的位置。

    3. 如果下一个transition必须由transition本身的定义来决定,那么关键字参数parallel_callback就不能传递(至少我不知道怎么做):

      TRANSITIONS = [
          {'trigger': 'next', 'source': 'A', 'dest': 'B',
              'prepare': [], 'before': [], 'after': ['next2']},
          {'trigger': 'next2', 'source': 'B', 'dest': 'C',
           'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
      ]
      

    方法二(我个人比较喜欢这种方式):

    在transitions的定义中,将相互依赖的回调分组在一起,应该按顺序解决。

    使用这种方法,最终的过渡看起来像这样

    TRANSITIONS = [
        {'trigger': 'next', 'source': 'A', 'dest': 'B',
         'prepare': [('initialize1', 'initialize2', 'initialize3')], 'before': [],
         'after': ['show_attributes']}
    ]
    

    说明:

    'prepare': [('callback1', 'callback2'), 'callback3']

    这里group1(callback1和callback2),group2(callback3)会异步(并行)解析。但是group1中的callback1和callback2会同步(依次)解析。

    重写的callbacks 方法现在看起来会略有不同,还有一个新的静态方法await_sequential

    class EnhancedMachine(AsyncMachine):
    
        async def callbacks(self, func_groups, event_data):
            """ Triggers a list of callbacks """
            resolved_func_groups = []
            for funcs in func_groups:
                if isinstance(funcs, (list, tuple)):
                    resolved_funcs = [partial(event_data.machine.callback, func, event_data) for func in funcs]
                else:
                    resolved_funcs = [partial(event_data.machine.callback, funcs, event_data)]
                resolved_func_groups.append(resolved_funcs)
    
            # await asyncio.gather(*[self.await_sequential(funcs) for funcs in resolved_func_groups])
            await self.await_all([partial(self.await_sequential, funcs) for funcs in resolved_func_groups])
    
        @staticmethod
        async def await_sequential(funcs):
            return [await func() for func in funcs]
    

    缺点:

    1. 方法和方法调用的定义没有改变。
    2. 更改了一个地方并修复了所有地方。

    缺点:

    1. 您应该知道您的方法在做什么。有时,不需要的分组会导致事件的解决出现不必要的延迟。

    使用这两种方法,我得到了相同的期望输出:

    Model1 A -> Initialized1:  1Done
    Model1 A -> Initialized2:  1Done_2Done
    Model1 A -> Initialized3:  1Done_2Done_3Done
    Model1 B -> Showing all: 1Done_2Done_3Done
    

    我坚持使用第二种方法,但我很高兴知道实现此类功能的其他有效方法:)

    【讨论】:

      【解决方案2】:

      我认为您的“方法 2”看起来不错。如果您知道所有回调应该按顺序执行并且您根本不需要并行执行,您也可以直接覆盖 await_all

      class EnhancedMachine(AsyncMachine):
      
          @staticmethod
          async def await_all(callables):
              return [await func() for func in callables]
      

      如果您切换元组/列表的含义,您可以将代码稍微缩短为如下所示:

      class EnhancedMachine(AsyncMachine):
      
          async def callbacks(self, func_groups, event_data):
              results = []
              for funcs in func_groups:
                  if isinstance(funcs, (list, tuple)):
                      results.extend(await self.await_all(
                        [partial(event_data.machine.callback, func, event_data)
                         for func in funcs]
                      ))
                  else:
                      results.append(await self.callback(funcs, event_data))
              return results
      

      这启用了像[stage_1, (stage_2a, stage_2b, stage_2c), stage_3] 这样的回调注释,其中每个阶段按顺序执行,但子阶段被并行调用。

      【讨论】:

        猜你喜欢
        • 2020-04-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-06-04
        • 2011-08-18
        • 2012-03-15
        • 1970-01-01
        • 2021-12-07
        相关资源
        最近更新 更多