【发布时间】:2012-10-05 17:36:39
【问题描述】:
我正在使用 Reactive Extensions 创建匿名 Observable 来执行 Web 请求,对其进行后处理并返回一些值。这种技术帮助我将价值获取机制与使用这些值的其他类区分开来。 Web 请求经常执行,因此有一个计时器可以使用这种技术构造和调用请求。我构建 Observable 如下:
Observable.FromAsyncPattern<WebResponse>(getRequest.BeginGetResponse, getRequest.EndGetResponse)
实际上,端点并不总是可用的。在 Subscribe() 方法中指定的 Web 请求超时回调的情况下,永远不会调用方法。我正在使用Timeout() Rx 扩展来处理这种情况。我也在做请求结果的后处理。这是我为解决该问题而实施的一段模拟代码:
var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(...)
然后我实现了处理 Subscribe() 方法的返回值来取消订阅 Observable 的回调委托。实际上我有一组三元组<IObservable obs, IDisposable disp, bool RequestComplete>。 Subscribe() 方法回调设置 RequestComplete = true 并且随着时间的推移,如果不再需要所有一次性用品,即已调用回调,则所有一次性用品都会被处置。一切似乎都很好,但我注意到收藏在不断增长,我不知道为什么。
所以我实现了这种情况的模拟。此代码没有处理部分 - 我将其替换为简单的请求计数器:
public static class RequestCounter
{
private static object syncRoot = new object();
private static int count = 0;
public static void Increment()
{
lock (syncRoot)
{
count++;
}
}
public static void Decrement()
{
lock (syncRoot)
{
count--;
}
}
public static int RequestCount
{
get { return count; }
}
}
这是测试本身:
public void RunTest()
{
while (true)
{
var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
RequestCounter.Increment();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(
res =>
{
RequestCounter.Decrement();
Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
},
ex =>
{
RequestCounter.Decrement();
Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
},
() =>
{
});
}
}
在网络请求计数器增加之前。请求超时后,它会递减。但是请求计数器会无限增长。你能解释一下我错过了什么吗?
【问题讨论】:
-
原来线程池没有空闲线程来处理递减
标签: c# system.reactive