【问题标题】:Reactive Extensions and Retry反应式扩展和重试
【发布时间】:2014-07-09 13:53:15
【问题描述】:

所以今天早上我的雷达上突然出现了一系列文章。它以 this question 开头,在 GitHub 上导致 the original example 和 source code

我稍微重写了一下,所以我可以开始在控制台和服务应用程序中使用它:

public static class Extensions
{
    static readonly TaskPoolScheduler Scheduler = new TaskPoolScheduler(new TaskFactory());

    // Licensed under the MIT license with <3 by GitHub

    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 8, 16...
    /// </summary>
    [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
    public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

    /// <summary>
    /// A linear strategy which starts with 1 second and then 2, 3, 4...
    /// </summary>
    [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
    public static readonly Func<int, TimeSpan> LinearStrategy = n => TimeSpan.FromSeconds(1*n);

    /// <summary>
    /// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the 
    /// specified number of times or until it successfully terminates. Allows for customizable back off strategy.
    /// </summary>
    /// <param name="source">The source observable.</param>
    /// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
    /// <param name="strategy">The strategy to use in backing off, exponential by default.</param>
    /// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
    /// <param name="scheduler">The scheduler.</param>
    /// <returns>
    /// A cold observable which retries (re-subscribes to) the source observable on error up to the 
    /// specified number of times or until it successfully terminates.
    /// </returns>
    [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? ExponentialBackoff;
        scheduler = scheduler ?? Scheduler;

        if (retryOnError == null)
            retryOnError = e => true;

        int attempt = 0;

        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }
}

现在为了测试它是如何工作的,我写了这个小程序:

class Program
{
    static void Main(string[] args)
    {
        int tryCount = 0;
        var cts = new CancellationTokenSource();

        var sched = new TaskPoolScheduler(new TaskFactory());
        var source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                var a = 5/tryCount++;
                return Observable.Return("yolo");
            });

        source.RetryWithBackoffStrategy(scheduler: sched, strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);

        while (!cts.IsCancellationRequested)
            source.Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex =>
                {
                    Console.WriteLine("Error: {0}", ex.Message); 

                },
                () =>
                {
                    cts.Cancel();
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                });
    }
}

一开始我以为,订阅事件,会自动触发所有后续的retires。情况并非如此,所以我必须实现一个 Cancellation Token 并循环,直到它发出所有 reties 已用尽的信号。

另一种选择是使用 AutoResetEvent:

class Program
{
    static void Main(string[] args)
    {
        int tryCount = 0;
        var auto = new AutoResetEvent(false);

        var source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                var a = 5/tryCount++;
                return Observable.Return("yolo");
            });

        source.RetryWithBackoffStrategy(strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);

        while (!auto.WaitOne(1))
        {
            source.Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex =>
                {
                    Console.WriteLine("Error: {0}", ex.Message);
                },
                () =>
                {
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                    auto.Set();
                });
        }
    }
}

在这两种情况下,它都会显示这些行:

Action 0
Error: Attempted to divide by zero.
Action 1
Result: yolo
End Processing after 2 attempts

我要向这群人提出的问题是:这是使用此扩展程序的最佳方式吗?或者有没有办法订阅 Observable 以便它自己重新触发,直到重试次数?

最终更新

根据 Brandon 的建议,这是正确的订阅方式:

internal class Program
{
    #region Methods

    private static void Main(string[] args)
    {
        int tryCount = 0;
        IObservable<string> source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                int a = 5 / tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(strategy: Extensions.ExponentialBackoff, retryOnError: exception => exception is DivideByZeroException, scheduler: Scheduler.Immediate)
            .Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex => { Console.WriteLine("Error: {0}", ex.Message); },
                () =>
                {
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                });
    }

    #endregion
}

输出会略有不同:

Action 0
Action 1
Result: yolo
End Processing after 2 attempts

结果证明这是一个非常有用的扩展。这是另一个如何使用它的示例,其中使用委托给出策略和错误处理。

internal class Program
{
    #region Methods

    private static void Main(string[] args)
    {
        int tryCount = 0;
        IObservable<string> source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                int a = 5 / tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(
            strategy: i => TimeSpan.FromMilliseconds(1),
            retryOnError: exception =>
            {
                if (exception is DivideByZeroException)
                {
                    Console.WriteLine("Tried to divide by zero");
                    return true;
                }
                return false;
            },
            scheduler: Scheduler.Immediate).Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex => { Console.WriteLine("Error: {0}", ex.Message); },
                () =>
                {
                    Console.WriteLine("Succeeded after {0} attempts", tryCount);
                });
    }

    #endregion
}

输出:

Action 0
Tried to divide by zero
Action 1
Result: yolo
Succeeded after 2 attempts

【问题讨论】:

    标签: c# system.reactive reactive-programming


    【解决方案1】:

    是的,Rx 通常是异步的,因此在编写测试时,您需要等待它完成(否则 Main 在您调用订阅后立即退出)。

    另外,请确保您订阅了通过调用 source.RetryWithBackoffStrategy(...) 生成的 observable。这会产生一个具有重试语义的 new 可观察对象。

    在这种情况下,最简单的解决方案是直接使用Wait

    try
    {
      var source2 = source.RetryWithBackoffStrategy(/*...*/);
    
      // blocks the current thread until the source finishes
      var result = source2.Wait(); 
      Console.WriteLine("result=" + result);
    }
    catch (Exception err)
    {
      Console.WriteLine("uh oh", err);
    }
    

    如果您使用 NUnit(支持异步测试)之类的东西来编写测试,那么您可以这样做:

    [Test]
    public async Task MyTest()
    {
        var source = // ...;
        var source2 = source.RetryWithBackoffStrategy(/*...*/);
        var result = await source2; // you can await observables
        Assert.That(result, Is.EqualTo(5));
    }
    

    【讨论】:

    • Wait 会在遇到第一个错误后退出,不会调用最终的 Action。
    • 您的 RetryWithBackoffStrategy() 正在处理错误并重试序列。
    • 就是这样,它会在后续订阅时重试,但我宁愿订阅一次,并希望所有重试,直到成功事件,都会自动为我触发。
    • 哦,我看错了你的代码。问题是你需要订阅source.RetryWithBackoffStrategy(...)不是 source
    • 完全准确地说,Rx 默认情况下是绝对同步的。有些东西,比如延迟操作符,默认使用异步调度器,因为这是最常见的用例。但是,Map、Reduce、Filter、Retry 等默认是同步的。话虽如此,大多数 Rx 的使用最终都是异步的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多