【问题标题】:Map errors to observable using C# ReactiveX使用 C# ReactiveX 将错误映射到 observable
【发布时间】:2016-12-23 01:51:49
【问题描述】:

我有一个可观察的MyObservable<Object> 可以将CustomExceptions 扔到哪里

private class CustomException : Exception

我想要做的是将CustomExceptions 转换为对象并在新的 observable 中发出这些对象。

这是我目前的解决方案,但我想知道是否可以在不必直接调用 Subject 的 onNext、onCompleted 或 onError 方法的情况下完成此操作。

var MySubject = new Subject<NewObject>();

MyObservable.Catch<Object, CustomException>(
            ex =>
            {
                NewObject o = new NewObject(ex.Message);
                MySubject.OnNext(o);
                return Observable.Empty<Object>();
            });

IObservable<IList<NewObject>> listObservable = MySubject.ToList();

编辑:谢谢 ibebbs!像魅力一样工作!

【问题讨论】:

    标签: c# exception observable reactivex


    【解决方案1】:

    您可以使用Materialize() 函数捕获和映射没有主题的异常,如下所示:

    var errorObservable = source
        .Select(projection)
        .Materialize()
        .Where(notification => notification.Kind == NotificationKind.OnError)
        .Select(notification => notification.Exception)
        .OfType<CustomException>()
        .Select(exception => new NewObject(exception.Message));
    

    Materialize 函数接受一个IObservable&lt;T&gt; 并将其映射到一个IObservable&lt;Notification&lt;T&gt;&gt;,其中每个通知都有一个KindOnNextOnErrorOnComplete。上面的 observable 只是查找带有 Kind`` of OnError and with the Exception being an instance of CustomException then projects these exceptions into anIObservable``` 的通知。

    这是一个显示此工作的单元测试:

    [Fact]
    public void ShouldEmitErrorsToObservable()
    {
        Subject<int> source = new Subject<int>();
        List<int> values = new List<int>();
        List<NewObject> errors = new List<NewObject>();
    
        Func<int, int> projection =
            value =>
            {
                if (value % 2 == 1) throw new CustomException("Item is odd");
    
                return value;
            };
    
        Func<CustomException, IObservable<int>> catcher = null;
    
        catcher = ex => source.Select(projection).Catch(catcher);
    
        var errorObservable = source
            .Select(projection)
            .Materialize()
            .Where(notification => notification.Kind == NotificationKind.OnError)
            .Select(notification => notification.Exception)
            .OfType<CustomException>()
            .Select(exception => new NewObject(exception.Message));
    
        var normalSubscription = source.Select(projection).Catch(catcher).Subscribe(values.Add);
        var errorSubscription = errorObservable.Subscribe(errors.Add);
    
        source.OnNext(0);
        source.OnNext(1);
        source.OnNext(2);
    
        Assert.Equal(2, values.Count);
        Assert.Equal(1, errors.Count);
    }
    

    但是,正如您在上面使用的解释性捕获机制中看到的那样,Rx 中的异常处理可能很难正确处理,甚至更难以优雅地处理。相反,请考虑Exceptions should be Exceptional,并且如果您预计会出现一类错误,并且您已经为其编写了自定义异常,那么该错误并不是真正的异常,而是必须处理这些错误的流程的一部分。

    在这种情况下,我建议将 observable 投影到一个类中,该类体现了“尝试此操作并记录结果,无论是值还是异常”,并在执行链中进一步使用。

    在下面的示例中,我使用“Fallible”类来捕获操作的结果或异常,然后订阅“Fallible”实例流,将错误与值分开。正如您将看到的,由于错误和值共享对底层源的单一订阅,因此代码更简洁且性能更好:

    internal class Fallible
    {
        public static Fallible<TResult> Try<TResult, TException>(Func<TResult> action) where TException : Exception
        {
            try
            {
                return Success(action());
            }
            catch (TException exception)
            {
                return Error<TResult>(exception);
            }
        }
    
        public static Fallible<T> Success<T>(T value)
        {
            return new Fallible<T>(value);
        }
    
        public static Fallible<T> Error<T>(Exception exception)
        {
            return new Fallible<T>(exception);
        }
    }
    
    internal class Fallible<T>
    {
        public Fallible(T value)
        {
            Value = value;
            IsSuccess = true;
        }
    
        public Fallible(Exception exception)
        {
            Exception = exception;
            IsError = true;
        }
    
        public T Value { get; private set; }
        public Exception Exception { get; private set; }
        public bool IsSuccess { get; private set; }
        public bool IsError { get; private set; }
    }
    
    [Fact]
    public void ShouldMapErrorsToFallible()
    {
        Subject<int> source = new Subject<int>();
        List<int> values = new List<int>();
        List<NewObject> errors = new List<NewObject>();
    
        Func<int, int> projection =
            value =>
            {
                if (value % 2 == 1) throw new CustomException("Item is odd");
    
                return value;
            };
    
        var observable = source
            .Select(value => Fallible.Try<int, CustomException>(() => projection(value)))
            .Publish()
            .RefCount();
    
        var errorSubscription = observable
            .Where(fallible => fallible.IsError)
            .Select(fallible => new NewObject(fallible.Exception.Message))
            .Subscribe(errors.Add);
    
        var normalSubscription = observable
            .Where(fallible => fallible.IsSuccess)
            .Select(fallible => fallible.Value)
            .Subscribe(values.Add);
    
        source.OnNext(0);
        source.OnNext(1);
        source.OnNext(2);
    
        Assert.Equal(2, values.Count);
        Assert.Equal(1, errors.Count);
    }
    

    希望对你有帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-02-03
      • 1970-01-01
      • 2015-03-17
      • 2017-09-23
      • 1970-01-01
      • 2018-09-22
      • 2019-09-18
      相关资源
      最近更新 更多