【问题标题】:Polly CircuitBreaker fallback not workingPolly 断路器回退不起作用
【发布时间】:2018-12-20 16:16:39
【问题描述】:

我有以下政策:

var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException)).WaitAndRetry(
                retryCount: maxRetryCount,
                sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
                onRetry: (exception, calculatedWaitDuration, retryCount, context) =>
                {
                    Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
                });

var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException)).CircuitBreaker(maxExceptionsBeforeBreaking, TimeSpan.FromSeconds(circuitBreakDurationSeconds), onBreak, onReset);

var sharedBulkhead = Policy.Bulkhead(maxParallelizations, maxQueuingActions, onBulkheadRejected);

var fallbackForCircuitBreaker = Policy<bool>
             .Handle<BrokenCircuitException>()
             .Fallback(
                 fallbackValue: false,
                 onFallback: (b, context) =>
                 {
                     Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                 }
             );

            var fallbackForAnyException = Policy<bool>
                .Handle<Exception>()
                .Fallback(
                    fallbackAction: (context) => { return false; },
                    onFallback: (e, context) =>
                    {
                        Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                    }
                );

var resilienceStrategy = Policy.Wrap(retryPolicy, circuitBreaker, sharedBulkhead);
            var policyWrap = fallbackForAnyException.Wrap(fallbackForCircuitBreaker.Wrap(resilienceStrategy));

public bool CallApi(ChangeMapModel changeMessage)
    {
        var httpClient = new HttpClient();
        var endPoint = changeMessage.EndPoint;
        var headers = endPoint.Headers;
        if (headers != null)
        {
            foreach (var header in headers)
            {
                if (header.Contains(':'))
                {
                    var splitHeader = header.Split(':');
                    httpClient.DefaultRequestHeaders.Add(splitHeader[0], splitHeader[1]); 
                }
            } 
        }

        var res = httpClient.PostAsync(endPoint.Uri, null);
        var response = res.Result;
        response.EnsureSuccessStatusCode();
        return true;
    }

我这样执行策略:

policyWrap.Execute((context) => CallApi(changeMessage), new Context(endPoint));

问题是当在开路上执行操作时,我在 CircuitBreaker 回调中没有受到打击。

我希望通过策略进行 API 调用,要处理的异常类型为 HttpRequestException。政策定义有问题吗?为什么不调用断路器回退?

【问题讨论】:

  • 你能显示CallApi(...)方法的内容吗?或者提供一个存根/替换CallApi(...) 方法将整个事情变成一个可验证的例子? *.com/help/mcve 。这里example code proving FallbackPolicy can handle BrokenCircuitException;也可以作为runnable dotnetfiddle
  • 我在答案中添加了CallApi 的代码。我特别怀疑我的策略定义,尤其是重试策略和断路器策略处理的异常组合。
  • 感谢您发布CallApi() 方法。我能够根据发布的代码创建评论 *.com/help/mcve。您的确切原因可能取决于您选择的特定配置参数,但希望完整注释的示例提供了足够的探索途径。

标签: c# asp.net polly


【解决方案1】:

我创建了以下minimum, complete, verifiable example 来帮助探索问题:

注意:不一定是成品;只是对发布的代码和额外的注释进行了一些小修改,以帮助探索问题。

using Polly;
using Polly.CircuitBreaker;
using System;
using System.Net.Http;
using System.Threading.Tasks;

public class Program
{
    public static void Main()
    {
        int maxRetryCount = 6;
        double circuitBreakDurationSeconds = 0.2 /* experiment with effect of shorter or longer here, eg: change to = 1, and the fallbackForCircuitBreaker is correctly invoked */ ;
        int maxExceptionsBeforeBreaking = 4; /* experiment with effect of fewer here, eg change to = 1, and the fallbackForCircuitBreaker is correctly invoked */
        int maxParallelizations = 2;
        int maxQueuingActions = 2;

        var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException || (/*!(e is BrokenCircuitException) &&*/ e.InnerException is HttpRequestException))) // experiment with introducing the extra (!(e is BrokenCircuitException) && ) clause here, if necessary/desired, depending on goal
            .WaitAndRetry(
                retryCount: maxRetryCount,
                sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(50 * Math.Pow(2, attempt)),
                onRetry: (ex, calculatedWaitDuration, retryCount, context) =>
                {
                    Console.WriteLine(String.Format("Retry => Count: {0}, Wait duration: {1}, Policy Wrap: {2}, Policy: {3}, Endpoint: {4}, Exception: {5}", retryCount, calculatedWaitDuration, context.PolicyWrapKey, context.PolicyKey, context.OperationKey, ex.Message));
                });

        var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException))
            .CircuitBreaker(maxExceptionsBeforeBreaking,
                TimeSpan.FromSeconds(circuitBreakDurationSeconds),
                onBreak: (ex, breakDuration) => {
                    Console.WriteLine(String.Format("Circuit breaking for {0} ms due to {1}", breakDuration.TotalMilliseconds, ex.Message));
                },
                onReset: () => {
                    Console.WriteLine("Circuit closed again.");
                },
                onHalfOpen: () => { Console.WriteLine("Half open."); });

        var sharedBulkhead = Policy.Bulkhead(maxParallelizations, maxQueuingActions);

        var fallbackForCircuitBreaker = Policy<bool>
             .Handle<BrokenCircuitException>()
            /* .OrInner<BrokenCircuitException>() */ // Consider this if necessary.
            /* .Or<Exception>(e => circuitBreaker.State != CircuitState.Closed) */ // This check will also detect the circuit in anything but healthy state, regardless of the final exception thrown.
             .Fallback(
                 fallbackValue: false,
                 onFallback: (b, context) =>
                 {
                     Console.WriteLine(String.Format("Operation attempted on broken circuit => Policy Wrap: {0}, Policy: {1}, Endpoint: {2}", context.PolicyWrapKey, context.PolicyKey, context.OperationKey));
                 }
             );

        var fallbackForAnyException = Policy<bool>
                .Handle<Exception>()
                .Fallback<bool>(
                    fallbackAction: (context) => { return false; },
                    onFallback: (e, context) =>
                    {
                        Console.WriteLine(String.Format("An unexpected error occured => Policy Wrap: {0}, Policy: {1}, Endpoint: {2}, Exception: {3}", context.PolicyWrapKey, context.PolicyKey, context.OperationKey, e.Exception.Message));
                    }
                );

        var resilienceStrategy = Policy.Wrap(retryPolicy, circuitBreaker, sharedBulkhead);
        var policyWrap = fallbackForAnyException.Wrap(fallbackForCircuitBreaker.Wrap(resilienceStrategy));

        bool outcome = policyWrap.Execute((context) => CallApi("http://www.doesnotexistattimeofwriting.com/"), new Context("some endpoint info"));
    }

    public static bool CallApi(string uri)
    {
        using (var httpClient = new HttpClient() { Timeout = TimeSpan.FromSeconds(1) }) // Consider HttpClient lifetimes and disposal; this pattern is for minimum change from original posted code, not a recommendation.
        {
            Task<HttpResponseMessage> res = httpClient.GetAsync(uri);
            var response = res.Result; // Consider async/await rather than blocking on the returned Task.
            response.EnsureSuccessStatusCode();
            return true;
        }
    }
}

导致fallbackForCircuitBreaker 未被调用的因素可能不止一个:

  1. circuitBreakDurationSeconds 的设置可能比各种尝试和重试之间的等待所花费的总时间短。

如果是这样,电路可能会恢复到半开状态。在half-open stateclosed state 中,会按原样重新抛出导致电路中断的异常。 BrokenCircuitException 仅在(完全)开路阻止尝试调用时抛出。

因此,如果您的电路在重试耗尽时恢复为半开状态,则返回到包装回退策略的异常将是 HttpRequestException,而不是 BrokenCircuitException

  1. .Handle&lt;Exception&gt;(e =&gt; (e is HttpRequestException || e.InnerException is HttpRequestException)) 子句可能会捕获具有InnerException is HttpRequestExceptionCircuitBreakerExceptions

CircuitBreakerExceptioncontains the exception which caused the circuit to break as its InnerException。因此,对e.InnerException is HttpRequestException 的过度贪婪/宽松的检查也可能会捕获具有InnerException is HttpRequestExceptionCircuitBreakerException。根据您的目标,这可能需要也可能不需要。

我相信原始发布的代码不会发生这种情况,因为它的构造方式特殊。阻止HttpClient.DoSomethingAsync(...) 返回的Task 已经导致AggregateException-&gt;HttpRequestException,这意味着生成的CircuitBreakerException 嵌套了HttpRequestException 两层:

CircuitBreakerException -> AggregateException -> HttpRequestException

所以这不属于已发布代码中的 one 深度检查。但是,注意CircuitBreakerException 包含导致电路中断的异常,因为它的InnerException。这可能会导致句柄子句检查 e.InnerException is HttpRequestException不必要地(如果这不是您的目标)重试CircuitBreakerException,如果:

(a) 将代码更改为async/await,这将删除AggregateException,从而导致嵌套只有一层

(b) 将代码更改为 Polly 的 .HandleInner&lt;HttpRequestException&gt;() syntax,这是递归贪婪的,因此会捕获嵌套的两深 CircuitBreakerException-&gt;AggregateException-&gt;HttpRequestException


上述代码中的建议 /* commented out */ // with additional explanation 建议如何调整发布的代码,以便 fallbackForCircuitBreaker 按预期调用。


另外两个想法:

  1. 如果可能,请考虑在整个过程中更改为 async/await

通过调用.Result 阻塞HttpClient.DoSomethingAsync() 可能会影响性能,或者如果与其他异步代码混合使用可能会出现死锁,并且会带来整个AggregateException-with-InnerException 的痛苦。

  1. 考虑 HttpClient 实例的处置和生命周期。

(有意将这些第 3 点和第 4 点保持简短,正如在其他地方广泛讨论的那样。)

【讨论】:

  • 非常感谢您的回答。这很有帮助。 “不必要地重试 CircuitBreakerException” - 即使电路坏了,我们还不想重试吗?
  • “即使电路坏了我们还不想重试吗?”.-> 同意。这实际上可能意味着您想将重试切换到.HandleInner&lt;HttpRequestException&gt;(),或者显式添加.Or&lt;BrokenCircuitException&gt;(),_if_/而async/await 未使用。
  • 我决定切换到异步策略,这给我带来了问题。我为此添加了一个单独的问题。你介意看看吗? *.com/questions/53906021/…
最近更新 更多