【问题标题】:Python asyncio restart coroutinePython asyncio 重启协程
【发布时间】:2020-12-29 19:54:09
【问题描述】:

我是一名新的 Python 程序员,正在尝试编写一个“机器人”来为自己在必发进行交易。 (雄心勃勃!!!!)

我出现的问题是——我有一个 asyncio 事件循环运行的基础知识,但我注意到如果其中一个协程在其过程中失败(例如 API 调用失败或 mongodb 读取),那么 asyncio事件循环只是继续运行,但忽略了一个失败的协程

我的问题是如何自动重新启动一个协程或处理错误以停止完整的 asyncio 循环,但目前一切运行似乎都没有注意到某些事情不正确并且其中一部分失败的事实。在我的情况下,在数据库读取不成功后,循环从未返回到“rungetcompetitionids”函数,并且即使它处于一个真正的循环中,它也从未再次返回到该函数 usergui 还没有功能,但只能在那里尝试 asyncio

谢谢 克莱夫

import sys
import datetime
from login import sessiontoken as gst
from mongoenginesetups.setupmongo import global_init as initdatabase
from asyncgetcompetitionids import competition_id_pass as gci
from asyncgetcompetitionids import create_comp_id_table_list as ccid
import asyncio
import PySimpleGUI as sg

sg.change_look_and_feel('DarkAmber')

layout = [
    [sg.Text('Password'), sg.InputText(password_char='*', key='password')],
    [sg.Text('', key='status')],
    [sg.Button('Submit'), sg.Button('Cancel')]
]
window = sg.Window('Betfair', layout)


def initialisethedatabase():
    initdatabase('xxxx', 'xxxx', xxxx, 'themongo1', True)


async def runsessiontoken():
    nextlogontime = datetime.datetime.now()
    while True:
        returned_login_time = gst(nextlogontime)
        nextlogontime = returned_login_time
        await asyncio.sleep(15)


async def rungetcompetitionids(compid_from_compid_table_list):
    nextcompidtime = datetime.datetime.now()
    while True:
        returned_time , returned_list = gci(nextcompidtime, compid_from_compid_table_list)
        nextcompidtime = returned_time
        compid_from_compid_table_list = returned_list
        await asyncio.sleep(10)


async def userinterface():
    while True:
        event, value = window.read(timeout=1)
        if event in (None, 'Cancel'):
            sys.exit()
        if event != "__TIMEOUT__":
            print(f"{event} {value}")
        await asyncio.sleep(0.0001)


async def wait_list():
    await asyncio.wait([runsessiontoken(), 
                       rungetcompetitionids(compid_from_compid_table_list), 
                       userinterface()
                      ])


initialisethedatabase()
compid_from_compid_table_list = ccid()
print(compid_from_compid_table_list)
nextcompidtime = datetime.datetime.now()
print(nextcompidtime)
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_list())
loop.close()

【问题讨论】:

  • 使用asyncio.gather(x, y, z) 代替asyncio.wait([x, y, z])。如果协程引发,gather 也会引发,从而停止循环。使用asyncio.wait() 是错误的,除非您检查返回值或有专门的用例,例如return_when=FIRST_COMPLETED
  • 感谢您,它已被纳入我的解决方案中

标签: python python-asyncio coroutine


【解决方案1】:

一个简单的解决方案是使用捕获Exception 的包装函数(或“主管”),然后盲目地重试该函数。更优雅的解决方案包括打印出异常和堆栈跟踪以进行诊断,并查询应用程序状态以查看尝试并继续是否有意义。例如,如果必发告诉您您的帐户未经授权,那么继续下去就没有任何意义。如果这是一个普遍的网络错误,那么立即重试可能是值得的。如果主管注意到它在短时间内重新启动了很多次,您可能还想停止重试。

例如。

import asyncio
import traceback
import functools
from collections import deque
from time import monotonic

MAX_INTERVAL = 30
RETRY_HISTORY = 3
# That is, stop after the 3rd failure in a 30 second moving window

def supervise(func, name=None, retry_history=RETRY_HISTORY, max_interval=MAX_INTERVAL):
    """Simple wrapper function that automatically tries to name tasks"""
    if name is None:
        if hasattr(func, '__name__'): # raw func
            name = func.__name__
        elif hasattr(func, 'func'): # partial
            name = func.func.__name__
    return asyncio.create_task(supervisor(func, retry_history, max_interval), name=name)


async def supervisor(func, retry_history=RETRY_HISTORY, max_interval=MAX_INTERVAL):
    """Takes a noargs function that creates a coroutine, and repeatedly tries
        to run it. It stops is if it thinks the coroutine is failing too often or
        too fast.
    """
    start_times = deque([float('-inf')], maxlen=retry_history)
    while True:
        start_times.append(monotonic())
        try:
            return await func()
        except Exception:
            if min(start_times) > monotonic() - max_interval:
                print(
                    f'Failure in task {asyncio.current_task().get_name()!r}.'
                    ' Is it in a restart loop?'
                )
                # we tried our best, this coroutine really isn't working.
                # We should try to shutdown gracefully by setting a global flag
                # that other coroutines should periodically check and stop if they
                # see that it is set. However, here we just reraise the exception.
                raise
            else:
                print(func.__name__, 'failed, will retry. Failed because:')
                traceback.print_exc()


async def a():
    await asyncio.sleep(2)
    raise ValueError


async def b(greeting):
    for i in range(15):
        print(greeting, i)
        await asyncio.sleep(0.5)


async def main_async():
    tasks = [
        supervise(a),
        # passing repeated argument to coroutine (or use lambda)
        supervise(functools.partial(b, 'hello'))
    ]
    await asyncio.wait(
        tasks,
        # Only stop when all coroutines have completed
        # -- this allows for a graceful shutdown
        # Alternatively use FIRST_EXCEPTION to stop immediately 
        return_when=asyncio.ALL_COMPLETED,
    )
    return tasks


def main():
    # we run outside of the event loop, so we can carry out a post-mortem
    # without needing the event loop to be running.
    done = asyncio.run(main_async())
    for task in done:
        if task.cancelled():
            print(task, 'was cancelled')
        elif task.exception():
            print(task, 'failed with:')
            # we use a try/except here to reconstruct the traceback for logging purposes
            try:
                task.result()
            except:
                # we can use a bare-except as we are not trying to block
                # the exception -- just record all that may have happened.
                traceback.print_exc()

main()

这将导致如下输出:

你好 0 你好 1 你好 2 你好 3 一次失败,将重试。失败是因为: 回溯(最近一次通话最后): 文件“C:\Users\User\Documents\python\src\main.py”,第 30 行,在主管中 返回等待函数() 文件“C:\Users\User\Documents\python\src\main.py”,第 49 行,位于 引发 ValueError 值错误 你好 4 你好 5 你好 6 你好 7 一次失败,将重试。失败是因为: 回溯(最近一次通话最后): 文件“C:\Users\User\Documents\python\src\main.py”,第 30 行,在主管中 返回等待函数() 文件“C:\Users\User\Documents\python\src\main.py”,第 49 行,位于 引发 ValueError 值错误 你好 8 你好 9 你好 10 你好 11 任务“a”失败。是否在重启循环中? 你好 12 你好 13 你好 14 异常=ValueError()> 失败: 回溯(最近一次通话最后): 文件“C:\Users\User\Documents\python\src\main.py”,第 84 行,在 main 任务.结果() 文件“C:\Users\User\Documents\python\src\main.py”,第 30 行,在主管中 返回等待函数() 文件“C:\Users\User\Documents\python\src\main.py”,第 49 行,位于 引发 ValueError 值错误

【讨论】:

  • 感谢以上两位 cmets。我设法得到了一个混合解决方案,上面的两个想法都派上用场
  • 有趣的解决方案。您是否有一个版本也可以处理具有多个参数的协程?
  • 按照 main_async 函数中的建议,使用 lambda 或 partial 来包装协程函数。
  • 感谢您的回复!我只是在查看您的代码并且不知道部分是什么意思,但这个评论也帮助我更好地理解。 TY
  • 我不太确定,但start_times.append(monotonic()) 的位置是否可能存在错误。如果将它放在while true 范围内,它可能会捕获非异常的时间。它不应该在except Exception: 范围内吗?
猜你喜欢
  • 1970-01-01
  • 2018-02-25
  • 2019-03-16
  • 1970-01-01
  • 2020-07-03
  • 2020-10-10
  • 1970-01-01
  • 1970-01-01
  • 2015-12-02
相关资源
最近更新 更多