【问题标题】:python3 asyncio await on callbackpython3异步等待回调
【发布时间】:2018-07-13 15:59:59
【问题描述】:

我正在尝试弄清楚如何使用 rospy actionlib 和 asyncio 异步等待操作结果。为此我尝试了 编写一个动作执行生成器,但现在还没有成功。 我的想法是为动作的done_callback 添加一个 asyncio 未来 但未来似乎永远不会结束。

代码在这里:

def _generate_action_executor(self, action):

    async def run_action(goal):
        action_done = asyncio.Future()

        def done_callback(goal_status, result, future):
            status = ActionLibGoalStatus(goal_status)
            print('Action Done: {}'.format(status))
            future.set_result(result)

        action.send_goal(goal,
                         lambda x, y: done_callback(x,
                                                    y,
                                                    action_done))
        try:
            result = await action_done
            #problem future never done
        except asyncio.CancelledError as exc:
            action.cancel()
            raise exc

        return result

    return run_action

async def do_some_other_stuff(action):
    //do stuff
    my_goal = MyActionRequest('just do it')

    run_action = self._generate_action_executor(action)

    response = await run_action(my_goal)

    return response


if __name__ == "__main__":
    action = actionlib.SimpleActionClient('my_action',
                                           MyAction)

    try:
        loop = asyncio.get_event_loop()

        loop.run_until_complete(do_some_other_stuff(action))
    finally:
        loop.close()

【问题讨论】:

  • 回调是否在另一个线程中运行?如果是,您必须使用loop.call_soon_threadsafe,例如loop.call_soon_threadsafe(done_callback, x, y, action_done)
  • 是的,它被另一个线程调用。怎么比就等着呢?
  • 嗨文森特我对你的回答有点困惑。使用 loop.call_soon_threadsafe 事件循环将调用 done_callback 对吗?但在我的情况下,该操作将调用回调。唯一的问题是我不知道如何正确注册未来,以便我可以等待它并获得结果。

标签: python-3.x asynchronous python-asyncio rospy


【解决方案1】:

有了文森特的想法,

我实际上找到了解决问题的方法:

def _generate_action_executor(action):
    async def run_action(goal):
        loop = asyncio.get_event_loop()
        action_done = loop.create_future()

        def done_callback(goal_status, result, future, loop):
            status = ActionLibGoalStatus(goal_status)
            print('Action Done: {}'.format(status))
            loop.call_soon_threadsafe(future.set_result(result))

        action.send_goal(goal,partial(done_callback, future=action_done, loop=loop))
        try:
            await action_done
        except asyncio.CancelledError as exc:
            action.cancel()
            raise exc

        return action_done.result()

    return run_action

如果有人知道如何以更智能的方式实现它,请 通过使用来分享这些知识。

最佳曼纽尔

【讨论】:

    【解决方案2】:

    请记住,asyncio 旨在在单个线程中运行。

    如果程序需要与其他线程交互,您必须使用其中一个专用函数:

    这是一个简化的例子:

    async def run_with_non_asyncio_lib(action, arg):
        future = asyncio.Future()
        loop = asyncio.get_event_loop()
    
        def callback(*args):
            loop.call_soon_threasafe(future.set_result, args)
    
        non_asyncio_lib.register_callback(action, arg, callback)
        callback_args = await future
        return process(*callback_args)
    

    另外,loop.run_in_executor 提供了一种通过在自己的线程中运行给定函数来与非异步库交互的方法:

    async def run_with_non_asyncio_lib(action, arg):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, non_asyncio_lib.run, action, arg)
        result = await future
        return result
    

    【讨论】:

      【解决方案3】:

      完整的测试代码(供需要的用户使用)
      // 基于@user3851038 的回答和
      // asyncio.wait_for() src 实现


      
      import asyncio
      # from asyncio import exceptions
      from datetime import datetime
      
      # wait a callback called
      async def test_wait_callback():
          loop = asyncio.get_event_loop()
          waiter = loop.create_future()
      
          ret_code = None
          def callback():
              nonlocal ret_code
              ret_code = 123
              print(datetime.now(), 'callback called:', ret_code)
              if not waiter.done(): waiter.set_result(None)  # <--- tell done
      
          print(datetime.now(), 'do_stuff_with_callback call')
          await do_stuff_with_callback(callback)
          print(datetime.now(), 'do_stuff_with_callback return')
      
          try:
              ret = await waiter                             # <--- wait done
              print(datetime.now(), 'callback done  :', ret_code)
          except asyncio.exceptions.CancelledError:
              pass
      
      
      # run something in main loop (not current task), with callback
      async def do_stuff_with_callback(cb):
          async def task():
              await asyncio.sleep(2) # 2 sec
              if callable(cb): cb()
      
          loop = asyncio.get_event_loop()
          loop.create_task(task())
      
      
      if __name__ == '__main__':
          asyncio.run(test_wait_callback())
      
          # loop = asyncio.get_event_loop()
          # loop.run_until_complete(test_wait_callback())
      
      
      
      

      输出:

      2021-12-04 18:09:36.400958 do_stuff_with_callback call
      2021-12-04 18:09:36.400996 do_stuff_with_callback return
      2021-12-04 18:09:38.401790 callback called: 123
      2021-12-04 18:09:38.401974 callback done  : 123
      
      

      【讨论】:

        猜你喜欢
        • 2021-06-30
        • 2011-05-10
        • 2018-09-22
        • 2017-06-29
        • 2019-11-15
        • 1970-01-01
        • 1970-01-01
        • 2021-10-12
        • 2017-09-02
        相关资源
        最近更新 更多