【问题标题】:HTTP POST Reactive Extention async. pattern handle errorsHTTP POST 反应扩展异步。模式句柄错误
【发布时间】:2011-11-01 14:24:56
【问题描述】:

我正在尝试使用以下代码,但发生异常时它不起作用。谁能帮我解决这个问题?我试图在 fetchresponse().catch() 中抛出 web 异常。是否可以返回不同类型的数据作为回报(不仅仅是一个字符串)。

IObservable<string> tempReturnData = null;
        try
        {
            // create http web request
            HttpWebRequest WSrequest = (HttpWebRequest)WebRequest.Create(WebURL);
            // instantiate Request class object
            RequestState rs = new RequestState();
            rs.Request = WSrequest;
            // Lock current webrequest object.. incase of retry attempt 
            lock (WSrequest)
            {
                rs.Request.ContentType = "application/x-www-form-urlencoded";
                rs.Request.Method = "POST";
                rs.Request.Timeout = 100;
                // bug in .net that closes the connection prior to it being finished 
                rs.Request.KeepAlive = false;
                rs.Request.ProtocolVersion = HttpVersion.Version10;
                // async pattern get request
                var fetchRequestStream = Observable.FromAsyncPattern<Stream>(rs.Request.BeginGetRequestStream, rs.Request.EndGetRequestStream);
                // async pattern get response
                var fetchResponse = Observable.FromAsyncPattern<WebResponse>(rs.Request.BeginGetResponse, rs.Request.EndGetResponse);
                // 
                tempReturnData = (from tempResult in fetchRequestStream() select tempResult).SelectMany(stream =>
                    {
                        using (var writer = new StreamWriter(stream)) writer.Write(postData);
                        // here i wants to catch web exception in fetchResponse()   FYI : in my function i am returning IObservable<string>
                        return fetchResponse().Catch(Observable.Empty<WebResponse>()).Retry(5);
                    }).Select(result =>
                    {
                        lock (rs)
                        {
                            rs.Response = (HttpWebResponse)result;

                            string s = "";
                            // if response is ok then read response stream data
                            if (rs.Response.StatusCode == HttpStatusCode.OK)
                            {
                                using (StreamReader reader = new StreamReader(rs.Response.GetResponseStream())) s = reader.ReadToEnd();
                            }
                            // Error case if error occurs then try after random time period
                            else
                            {
                                if (Attempt < appConfig.PSPRequestAttempt)
                                {
                                    Attempt++;
                                    RandomisePost(WebURL, postData, Attempt);
                                }
                            }
                            return s;
                        }
                    }); // get response stream data
                return tempReturnData;
            }
        }
        catch (Exception ex)
        {
            // Debug.WriteLine("Exception Occurs   " + ex.Message);
            return null;
        }

【问题讨论】:

    标签: .net multithreading c#-4.0 system.reactive reactive-programming


    【解决方案1】:

    我认为您过于努力地将 Rx 和非 Rx 代码混合在一起。尝试让您的 Rx 代码按照这样的简单 Rx 查询工作:

        return
            from st in fetchRequestStream()
            from rp in postDataAndFetchResponse(st)
            from s in fetchResult(rp)
            select s;
    

    此查询依赖于三个如下所示的函数:Func&lt;X, IObservable&lt;Y&gt;&gt;

    然后,您可以使用标准 RX 运算符处理所有重试和异常处理。无需进行任何时髦的“随机”调用!

    你可以这样称呼它:

    FetchStringFromPost("url", "postData")
    .Retry(3)
    .Subscribe(s => { }, ex =>
    {
        /* Exceptions here! */
    }, () => { });
    

    这是完整的代码:

    public IObservable<string> FetchStringFromPost(string WebURL, string postData)
    {
        var request = (HttpWebRequest)WebRequest.Create(WebURL);
        request.ContentType = "application/x-www-form-urlencoded";
        request.Method = "POST";
        request.Timeout = 100;
        request.KeepAlive = false;
        request.ProtocolVersion = HttpVersion.Version10;
    
        var fetchRequestStream = Observable
            .FromAsyncPattern<Stream>(
                request.BeginGetRequestStream,
                request.EndGetRequestStream);
    
        var fetchResponse = Observable
            .FromAsyncPattern<WebResponse>(
                request.BeginGetResponse,
                request.EndGetResponse);
    
        Func<Stream, IObservable<HttpWebResponse>> postDataAndFetchResponse = st =>
        {
            using (var writer = new StreamWriter(st))
            {
                writer.Write(postData);
            }
            return fetchResponse().Select(rp => (HttpWebResponse)rp);
        };
    
        Func<HttpWebResponse, IObservable<string>> fetchResult = rp =>
        {
            if (rp.StatusCode == HttpStatusCode.OK)
            {
                using (var reader = new StreamReader(rp.GetResponseStream()))
                {
                    return Observable.Return<string>(reader.ReadToEnd());
                }
            }
            else
            {
                var msg = "HttpStatusCode == " + rp.StatusCode.ToString();
                var ex = new System.Net.WebException(msg,
                    WebExceptionStatus.ReceiveFailure);
                return Observable.Throw<string>(ex);
            }
        };
    
        return
            from st in fetchRequestStream()
            from rp in postDataAndFetchResponse(st)
            from s in fetchResult(rp)
            select s;
    }
    

    当我测试上面的代码时,我尝试调用 FetchStringFromPost("http://www.microsoft.com", "foo").Materialize() 并得到了回复:

    看起来像一种享受。告诉我你的进展情况。

    【讨论】:

    • 我在发布数据时出错。 “流不可写。”
    • 我已经解决了这个问题。快速提问我如何在重试尝试中投入时间?
    • @Capricon - 你可以在重试之前添加这个:.Do(x =&gt; { }, ex =&gt; { Thread.Sleep(1000); })
    • 在特定时间安排上调用我的可观察方法的最佳时间表是什么?我在任务计划和 RX 计划之间感到困惑。你以前用过吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多