【问题标题】:Convert IEnumerable<Task<T>> to IObservable<T> with exceptions handling使用异常处理将 IEnumerable<Task<T>> 转换为 IObservable<T>
【发布时间】:2018-10-07 21:04:29
【问题描述】:

我想将IEnumerable&lt;Task&lt;T&gt;&gt; 转换为IObservable&lt;T&gt;。我找到了这个here的解决方案:

IObservable<T> ToObservable<T>(IEnumerable<Task<T>> source)
{
    return source.Select(t => t.ToObservable()).Merge();
}

通常情况下完全可以,但我需要处理异常,这可能会在该任务中引发...所以IObservable&lt;T&gt; 在第一次异常后不应该死。

我读到的,这个用例的建议是使用一些包装器,它会携带实际值或错误。所以我的尝试是

IObservable<Either<T, Exception>> ToObservable<T>(IEnumerable<Task<T>> source)
{
    var subject = new Subject<Either<T, Exception>>();

    foreach (var observable in GetIntsIEnumerable().Select(t => t.ToObservable()))
    {
        observable.Subscribe(i => subject.OnNext(i), e => subject.OnNext(e));
    }

    return subject;
}

Either&lt;T, Exception&gt; 借用自 this article

但这也不行,因为OnCompleted() 没有被调用。我该如何解决?我对 Rx 概念很陌生。

这里是完整的测试代码...

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static Task Main()
        {
            SemaphoreSlim signal = new SemaphoreSlim(0, 1);

            //GetInts1().Subscribe(
            //    i => Console.WriteLine($"OK: {i}"),
            //    e => Console.WriteLine($"ERROR: {e.Message}"),
            //    () => signal.Release());

            GetInts2().Subscribe(r => Console.WriteLine(r.Match(
                    i => $"OK: {i}",
                    e => $"ERROR: {e.Message}")),
                () => signal.Release());

            return signal.WaitAsync();
        }

        static IObservable<int> GetInts1()
        {
            return GetIntsIEnumerable().Select(t => t.ToObservable()).Merge();
        }

        static IObservable<Either<int, Exception>> GetInts2()
        {
            var subject = new Subject<Either<int, Exception>>();

            foreach (var observable in GetIntsIEnumerable().Select(t => t.ToObservable()))
            {
                observable.Subscribe(i => subject.OnNext(i), e => subject.OnNext(e));
            }

            return subject;
        }

        static IEnumerable<Task<int>> GetIntsIEnumerable()
        {
            Random rnd = new Random();

            foreach (int i in Enumerable.Range(1, 10))
            {
                yield return Task.Run(async () =>
                {
                    await Task.Delay(rnd.Next(0, 5000));

                    if (i == 6)
                        throw new ArgumentException();

                    return i;
                });
            }
        }
    }

    /// <summary>
    /// Functional data data to represent a discriminated
    /// union of two possible types.
    /// </summary>
    /// <typeparam name="TL">Type of "Left" item.</typeparam>
    /// <typeparam name="TR">Type of "Right" item.</typeparam>
    public class Either<TL, TR>
    {
        private readonly TL left;
        private readonly TR right;
        private readonly bool isLeft;

        public Either(TL left)
        {
            this.left = left;
            this.isLeft = true;
        }

        public Either(TR right)
        {
            this.right = right;
            this.isLeft = false;
        }

        public T Match<T>(Func<TL, T> leftFunc, Func<TR, T> rightFunc)
        {
            if (leftFunc == null)
            {
                throw new ArgumentNullException(nameof(leftFunc));
            }

            if (rightFunc == null)
            {
                throw new ArgumentNullException(nameof(rightFunc));
            }

            return this.isLeft ? leftFunc(this.left) : rightFunc(this.right);
        }

        /// <summary>
        /// If right value is assigned, execute an action on it.
        /// </summary>
        /// <param name="rightAction">Action to execute.</param>
        public void DoRight(Action<TR> rightAction)
        {
            if (rightAction == null)
            {
                throw new ArgumentNullException(nameof(rightAction));
            }

            if (!this.isLeft)
            {                
                rightAction(this.right);
            }
        }

        public TL LeftOrDefault() => this.Match(l => l, r => default);

        public TR RightOrDefault() => this.Match(l => default, r => r);

        public static implicit operator Either<TL, TR>(TL left) => new Either<TL, TR>(left);

        public static implicit operator Either<TL, TR>(TR right) => new Either<TL, TR>(right);
    }
}

【问题讨论】:

    标签: c# task system.reactive ienumerable


    【解决方案1】:

    有一种处理此类错误的内置机制。只需使用 .Materialize() 运算符,它将 IObservable&lt;T&gt; 更改为 IObservable&lt;Notification&lt;T&gt;&gt; 并允许将错误和完成视为正常值。

    因此,例如,Observable.Return&lt;int&gt;(42) 产生一个值 42 和一个完成,但 Observable.Return&lt;int&gt;(42).Materialize() 产生一个值 Notification.CreateOnNext&lt;int&gt;(42),然后是一个值 Notification.CreateOnCompleted&lt;int&gt;(),然后是一个正常的完成。

    如果您有一个产生错误的序列,那么您实际上会得到一个值Notification.CreateOnError&lt;T&gt;(exception),然后是正常完成。

    这一切都意味着您可以像这样更改代码:

    IObservable<Notification<T>> ToObservable<T>(IEnumerable<Task<T>> source)
    {
        return source.Select(t => t.ToObservable().Materialize()).Merge();
    }
    

    根据我的喜好,您的测试代码有点复杂。您永远不需要像使用它们那样使用SemaphoreSlimSubject

    我已经编写了自己的测试代码。

    void Main()
    {
        var r = new Random();
    
        IEnumerable<Task<int>> source =
            Enumerable
                .Range(0, 10).Select(x => Task.Factory.StartNew(() =>
        {
            Thread.Sleep(r.Next(10000));
            if (x % 3 == 0) throw new NotSupportedException($"Failed on {x}");
            return x;
        }));
    
        IObservable<Notification<int>> query = source.ToObservable();
    
        query
            .Do(x =>
            {
                if (x.Kind == NotificationKind.OnError)
                {
                    Console.WriteLine(x.Exception.Message);
                }
            })
            .Where(x => x.Kind == NotificationKind.OnNext) // Only care about vales
            .Select(x => x.Value)
            .Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("Done."));
    }
    
    public static class Ex
    {
        public static IObservable<Notification<T>> ToObservable<T>(this IEnumerable<Task<T>> source)
        {
            return source.Select(t => t.ToObservable().Materialize()).Merge();
        }
    }
    

    该代码的典型运行会产生:

    失败 3 2 5 4 0 失败 9日失败 6日失败 7 1 8 完毕。

    【讨论】:

    • 谢谢。我不知道Notification&lt;T&gt; 包装器。这是更好、更清洁的解决方案。 SemaphoreSlim 仅用于测试。我想在调用 OnCompleted() 之前保持控制台应用程序运行。
    • @MatějPokorný - 在处理异常并过滤掉完整信号后,您甚至可以使用 .Dematerialize() 将流返回到标准 IObservable&lt;T&gt;
    【解决方案2】:

    Rx 库包含一个 Merge 重载,它直接有效地合并任务,而不是将每个任务转换为中间丢弃的可观察序列:

    // Merges results from all source tasks into a single observable sequence.
    public static IObservable<TSource> Merge<TSource>(
        this IObservable<Task<TSource>> sources);
    

    您可以使用此运算符来实现ToObservable 方法,如下所示:

    IObservable<Either<T, Exception>> ToObservable<T>(IEnumerable<Task<T>> source)
    {
        return source
            .Select(async task =>
            {
                try { return new Either<T, Exception>(await task); }
                catch (Exception ex) { return new Either<T, Exception>(ex); }
            })
            .ToObservable()
            .Merge();
    }
    

    您可以将ToObservable 运算符放在Select 运算符之前或之后,没有任何区别。

    顺便说一句,有一个可用的简约库(Stephen Cleary 的Try)包含一个Try&lt;T&gt; 类型,其功能类似于Either 类型,但专门用于将Exception 作为第二个类型类型(作为Either&lt;T, Exception&gt;)。使用这个库,您可以像这样实现ToObservable 方法:

    IObservable<Try<T>> ToObservable<T>(IEnumerable<Task<T>> source)
    {
        return source
            .Select(task => Try.Create(() => task))
            .ToObservable()
            .Merge();
    }
    

    这里是Try.Create方法的定义:

    // Executes the specified function, and wraps either the result or the exception.
    public static Task<Try<T>> Create<T>(Func<Task<T>> func);
    

    【讨论】:

      猜你喜欢
      • 2013-06-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多