【问题标题】:Handling an exception and continuing a loop处理异常并继续循环
【发布时间】:2019-09-05 23:20:46
【问题描述】:

我正在用 Python 3.7 编写一个 ETL 框架,它使用函数作为带有特殊装饰器的“任务”。这些任务中的大多数都运行一个循环。如果在循环中引发异常,我希望函数通过记录有关失败的数据来处理此异常,并继续循环。

这是我目前所拥有的一个简化示例:

class TaskExecutionError(RuntimeError):
    def __init__(self, msg="", context={}):
        self.msg = msg
        self.context = context

    def __str__(self):
        return self.msg or "Error executing task."


def task(fn):
    @wraps(fn)
    def _wrapper(*args, **kwargs):
        start_ts = datetime.utcnow()
        try:
            return fn(*args, **kwargs)

        except TaskExecutionError as e:
            logger.exception(f"Task execution error will be logged: {e}.")
            fail_data = {
                    "task_name": fn.__name__,
                    "args": list(args),
                    "kwargs": kwargs,
                    "context": e.context,
                    "fail_message": str(e),
                    "fail_time": str(datetime.utcnow()),
                    # etc.
                }
            )
            # Write failure data in an object store

        finally:
            end_ts = datetime.utcnow()
            logger.info(f"*** Wallclock: {end_ts - start_ts}.")

    _wrapper.is_task = True
    return _wrapper


@task
def test_fail_log(a, b, c, kwa=1, kwb=2):
    """
    Test handling failures.
    """
    for i in range(10):
        if i % 3:
            raise TaskExecutionError(context={"i": i})
        else:
            print("All's well")

就我看到正在打印和保存的消息而言,这很好用,但是当然,一旦引发第一个异常,执行就会中断。

我应该如何解决这个问题以便继续执行?

似乎我不能使用非常方便的异常机制,我可能必须设计一个自定义的handle_failure() 函数左右。但是我不确定将函数装饰器的上下文传递给 handle failure() 函数的最佳方法,而我从装饰函数中调用它。

由于我将在几个@task 装饰函数中使用这种机制,因此如果可能的话,我希望有一个轻量级的调用,而不需要很多参数。

感谢您提出的任何建议。

【问题讨论】:

    标签: python-3.x exception python-decorators


    【解决方案1】:

    我使用inspect 解决了这个问题,我不喜欢经常使用它,但在这里似乎很有必要:

    def task(fn):
        @wraps(fn)
        def _wrapper(*args, **kwargs):
            start_ts = datetime.utcnow()
            try:
                return fn(*args, **kwargs)
    
            finally:
                end_ts = datetime.utcnow()
                logger.info(f"*** Wallclock: {end_ts - start_ts}.")
    
        _wrapper.is_task = True
    
    
    def handle_task_failure(exc, local_ctx={}):
        caller_frame = inspect.currentframe().f_back
        wrapper_frame = caller_frame.f_back
        caller_ctx = wrapper_frame.f_locals
        print(f"Context: {caller_ctx}")
        logger.exception(f"Task execution error will be logged: {exc}.")
        fail_data = {
                "start_time": caller_ctx.get("start_ts"),
                "runid": caller_ctx.get("runid"),
                "task_name": caller_ctx.get("fn").__name__,
                "args": list(caller_ctx.get("args")),
                "kwargs": caller_ctx.get("kwargs"),
                "context": local_ctx,
                "fail_message": str(exc),
                "fail_time": str(datetime.utcnow()),
                "traceback": format_stack(caller_frame),
        }
        # Save failure data in object store
    
    @task
    def test_fail_log(a, b, c, kwa=1, kwb=2):
        """
        Test handling failures.
        """
        for i in range(10):
            try:
                if i % 3:
                    raise RuntimeError("All's borked.")
                else:
                    print("All's well.")
            except Exception as exc:
                handle_task_failure(exc, {"i": i})
    
    

    另一方面,我不需要自定义异常类和处理失败的调用,这在几个有趣的重复中非常轻量级。

    【讨论】:

      猜你喜欢
      • 2019-01-15
      • 1970-01-01
      • 2013-11-03
      • 1970-01-01
      • 1970-01-01
      • 2013-02-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多