【问题标题】:How to handle the exception thrown by the async method with observable?如何用 observable 处理 async 方法抛出的异常?
【发布时间】:2022-01-13 19:23:08
【问题描述】:

我有一个 observable,我想用异步方法订阅这个 observable,但是每次异步方法抛出异常时,即使我将 catch 代码放在 observable 定义中,订阅也会立即处理。下面的伪代码演示了这种情况:

[Fact]
public async Task Test()
{
    var observable = Observable.Create<int>(observer =>
    {
        try
        {
            Enumerable.Range(1, 10).ToList().ForEach(x =>
            {
                observer.OnNext(x);
            });
        }
        catch (Exception ex)
        {
           // get called after the exception is thrown 
            _testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}"); 
        }
        return Disposable.Create(() =>
        {
           // also get called after exception is thrown
            _testOutputHelper.WriteLine("Observable Dispose"); 
        });
    });

    Func<int, Task> handler = async (i) =>
     {
         // simulate the handler logic
         await Task.Delay(TimeSpan.FromSeconds(1));
         // throw the exception to test 
         throw new Exception($"{i}");
     };

    observable.Subscribe(x=>handler(x).Wait());

    await Task.Delay(TimeSpan.FromSeconds(10));
}

从上面的代码中,我不明白为什么即使捕获异常也会调用 dispose 委托(出于某种原因,我必须在可观察定义中处理异常),有什么办法可以防止订阅当异步方法抛出异常时处理?

【问题讨论】:

  • “处置委托”是什么意思?你的意思是对 Disposable.Create 的调用吗?
  • @TobiasKildetoft 是的!Disposable.Create 的动作委托被调用
  • 它被调用是因为你吞下了异常而不是重新抛出它。出于以下所有代码的目的,可能从未抛出异常。

标签: c# .net async-await reactive-programming system.reactive


【解决方案1】:

您的代码中发生的事情是您使用 Observable.Create 并使用此代码填充 observable 的直接结果:

Enumerable.Range(1, 10).ToList().ForEach(x =>
{
    observer.OnNext(x);
});

Observable.Create 使用当前线程创建 observable,因此Enumerable.Range(1, 10).ToList().ForEach 立即在当前线程上执行,对OnNext 的调用立即执行handler(x).Wait()

不过,您会注意到,异常发生在传递给Subscribe 的委托中。内部有这样的代码:

catch (Exception exception)
{
    if (!autoDetachObserver.Fail(exception))
    {
        throw;
    }
    return autoDetachObserver;
}

在订阅中捕获异常,取消订阅 - 因此是 "Observable Dispose" 消息 - 然后重新抛出异常,这就是您的代码捕获它的地方。

现在,如果您想在 Rx 中正确执行此操作,则应避免使用 Observable.Create。这是一种创建可观察对象的诱人方式,但会带来麻烦。

改为这样做:

public async Task Test()
{
    Func<int, Task> handler = async (i) =>
     {
         // simulate the handler logic
         await Task.Delay(TimeSpan.FromSeconds(1));
         // throw the exception to test 
         throw new Exception($"{i}");
     };
 
    await
        Observable
            .Range(1, 10)
            .SelectMany(i => Observable.FromAsync(() => handler(i)))
            .LastOrDefaultAsync();
}

但是,当然,我们要处理异常。简单的方法是这样的:

public async Task Test()
{
    Func<int, Task> handler = async (i) =>
     {
         // simulate the handler logic
         await Task.Delay(TimeSpan.FromSeconds(1));
         // throw the exception to test 
         throw new Exception($"{i}");
     };
 
    await
        Observable
            .Range(1, 10)
            .SelectMany(i =>
                Observable
                    .FromAsync(() => handler(i))
                    .Catch<Unit, Exception>(ex =>
                    {
                        Console.WriteLine($"The exception is catch:{ex.ToString()}");
                        return Observable.Empty<Unit>();
                    }))
            .LastOrDefaultAsync();
}

现在输出 10 个异常错误并正常完成。

【讨论】:

  • 我更喜欢.Select(/*...*/).Merge(1),而不是.SelectMany(/*...*/),以防止并发调用异步handler。处理程序的设计可能没有考虑到并发性,因为它通常是同步处理程序的情况。
【解决方案2】:

您的代码完全按照您的指示行事。

捕获异常的目的是让您的程序可以继续运行而不会突然停止。这正是您的代码所做的:捕获异常,然后在 catch 块之后继续执行。

如果你想让它做其他事情,你有两个选择。

  1. 记录后重新抛出异常:
catch (Exception ex)
{
   // get called after the exception is thrown 
    _testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
    throw;
}

然后,任何名为 Test() 的代码都将负责捕获该异常(或不捕获)。

  1. 将 return 移到 try 块内,并在捕获到异常时返回其他内容:
try
{
    Enumerable.Range(1, 10).ToList().ForEach(x =>
    {
        observer.OnNext(x);
    });
    return Disposable.Create(() =>
    {
       // also get called after exception is thrown
        _testOutputHelper.WriteLine("Observable Dispose"); 
    });
}
catch (Exception ex)
{
   // get called after the exception is thrown 
    _testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
    
    return //something else
}

阅读 Microsoft 在 Exception Handling 上的文档可能会对您有所帮助。

【讨论】:

  • 这个答案确实没有对实际提出的问题进行任何讨论 - “有没有办法防止在异步方法抛出异常时处理订阅?”
  • 它也没有讨论使用 Rx 时如何处理异常的复杂性。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-04-03
  • 1970-01-01
相关资源
最近更新 更多