【发布时间】:2014-10-02 11:36:53
【问题描述】:
我正在尝试对众多独立数据源进行 POC。一种经典的观察者风格的应用程序。数据馈送的数量可能从几百到几千不等,观察者的数量可能从 2 到 20000 不等。下面是一个简单的数据馈送可观察模型的快速示例:
public class FeedMockUp
{
private readonly IScheduler observerScheduler;
private readonly Random rnd = new Random((int)DateTime.Now.Ticks);
private readonly Subject<double> sourceObservable;
private readonly IObservable<double> testedObservable;
public FeedMockUp(IScheduler observerScheduler)
{
this.observerScheduler = observerScheduler;
sourceObservable = new Subject<double>();
testedObservable =
Observable.Create<double>(x =>
{
var underlyingSourceDisposable =
sourceObservable
.Subscribe(_ => x.OnNext(rnd.NextDouble()));
return underlyingSourceDisposable;
});
}
public IDisposable SubscribeToUnderlyingFeed(int numberOfSubscribers)
{
int counter = 0;
var disposable = new CompositeDisposable();
for (int i = 0; i < numberOfSubscribers; i++)
{
disposable.Add(testedObservable
.ObserveOn(observerScheduler)
.Subscribe(_ => Interlocked.Increment(ref counter)));
}
return disposable;
}
public void PushNewFeed()
{
sourceObservable.OnNext(rnd.NextDouble());
}
}
当我为了提高 observables 更新的吞吐量而使用 shedulers 时,我注意到在使用 EventLoopScheduler 时,具有 100 个数据馈送和 1000 个观察者的应用程序的内存消耗是相当恒定的,对于 1000 个观察者来说,它是~100Mb 并且在添加新的观察者时线性增长。
但是,当我尝试使用 TaskPoolScheduler 时,在 x86 进程上我开始收到 OutOfMemoryException 异常,并且在 x64 进程上,内存消耗激增,或者更确切地说,对于 500 名观察者来说,内存消耗从 1Gb 到 2Gb 变得非常不确定,而且还在不断增长几乎成倍增加新的观察者。
这是我一直用于测试的代码。你能看出它有什么问题吗?为什么会有这样的性能差异?猜测,这里涉及一些内部复制/排队,但这只是我的猜测。理想情况下,我想知道这里发生了什么,而不需要深入研究 RX 代码库......
private static void Main(string[] args)
{
const int displayItemCount = 100;
const int callbackCount = 500;
//var rtScheduler = new EventLoopScheduler();
var rtScheduler = TaskPoolScheduler.Default;
var rtFeeds = new List<FeedMockUp>();
for (int i = 0; i < displayItemCount; i++)
{
var mockFeed = new FeedMockUp(rtScheduler);
mockFeed.SubscribeToUnderlyingFeed(callbackCount);
rtFeeds.Add(mockFeed);
}
foreach (var rtFeedMockUp in rtFeeds)
{
rtFeedMockUp.PushNewFeed();
}
Console.WriteLine("Memory used for feed {0} mockups with {1} observers / callbacks. Memory {2} Mb",
displayItemCount, callbackCount, Environment.WorkingSet / (1024 * 1024));
Console.ReadKey();
}
【问题讨论】:
标签: c#-4.0 system.reactive reactive-programming