【问题标题】:Rx TaskPoolScheduler vs EventLoopScheduler, memory usageRx TaskPoolScheduler vs EventLoopScheduler,内存使用
【发布时间】:2014-10-02 11:36:53
【问题描述】:

我正在尝试对众多独立数据源进行 POC。一种经典的观察者风格的应用程序。数据馈送的数量可能从几百到几千不等,观察者的数量可能从 2 到 20000 不等。下面是一个简单的数据馈送可观察模型的快速示例:

    public class FeedMockUp
    {
        private readonly IScheduler observerScheduler;
        private readonly Random rnd = new Random((int)DateTime.Now.Ticks);

        private readonly Subject<double> sourceObservable;
        private readonly IObservable<double> testedObservable;

        public FeedMockUp(IScheduler observerScheduler)
        {
            this.observerScheduler = observerScheduler;
            sourceObservable = new Subject<double>();

            testedObservable =
                Observable.Create<double>(x =>
                {
                    var underlyingSourceDisposable =
                        sourceObservable
                            .Subscribe(_ => x.OnNext(rnd.NextDouble()));
                    return underlyingSourceDisposable;
                });
        }

        public IDisposable SubscribeToUnderlyingFeed(int numberOfSubscribers)
        {
            int counter = 0;
            var disposable = new CompositeDisposable();

            for (int i = 0; i < numberOfSubscribers; i++)
            {
                disposable.Add(testedObservable
                    .ObserveOn(observerScheduler)
                    .Subscribe(_ => Interlocked.Increment(ref counter)));
            }

            return disposable;
        }

        public void PushNewFeed()
        {
            sourceObservable.OnNext(rnd.NextDouble());
        }
    }    

当我为了提高 observables 更新的吞吐量而使用 shedulers 时,我注意到在使用 EventLoopScheduler 时,具有 100 个数据馈送和 1000 个观察者的应用程序的内存消耗是相当恒定的,对于 1000 个观察者来说,它是~100Mb 并且在添加新的观察者时线性增长。

但是,当我尝试使用 TaskPoolScheduler 时,在 x86 进程上我开始收到 OutOfMemoryException 异常,并且在 x64 进程上,内存消耗激增,或者更确切地说,对于 500 名观察者来说,内存消耗从 1Gb 到 2Gb 变得非常不确定,而且还在不断增长几乎成倍增加新的观察者。

这是我一直用于测试的代码。你能看出它有什么问题吗?为什么会有这样的性能差异?猜测,这里涉及一些内部复制/排队,但这只是我的猜测。理想情况下,我想知道这里发生了什么,而不需要深入研究 RX 代码库......

    private static void Main(string[] args)
    {
        const int displayItemCount = 100;
        const int callbackCount = 500;

        //var rtScheduler = new EventLoopScheduler(); 
        var rtScheduler = TaskPoolScheduler.Default;
        var rtFeeds = new List<FeedMockUp>();
        for (int i = 0; i < displayItemCount; i++)
        {
            var mockFeed = new FeedMockUp(rtScheduler);
            mockFeed.SubscribeToUnderlyingFeed(callbackCount);
            rtFeeds.Add(mockFeed);
        }
        foreach (var rtFeedMockUp in rtFeeds)
        {
            rtFeedMockUp.PushNewFeed();
        }
        Console.WriteLine("Memory used for feed {0} mockups with {1} observers / callbacks. Memory {2} Mb",
            displayItemCount, callbackCount, Environment.WorkingSet / (1024 * 1024));
Console.ReadKey();

}

【问题讨论】:

    标签: c#-4.0 system.reactive reactive-programming


    【解决方案1】:

    使用ObserveOnTaskPoolScheduler 本质上是要为每个观察者创建一个LongRunning 任务。

    而默认的TaskSchedulercreating a Thread for each LongRunning tasks 结束。

    每个线程为其堆栈保留大约 1MB。

    因此,使用TaskPoolScheduler 的 500 名观察者将至少保留 500MB。你可以看到这是怎么回事...

    另一方面,EventLoopScheduler 在单线程上运行。因此,将ObserveOn 与此调度程序一起使用实际上只是将一个条目添加到调度程序的工作队列中。这个条目比一个线程的 1MB 成本要小得多。

    所以,EventLoopScheduler 在这种情况下内存效率更高,但它也会连续通知观察者,如果有很多观察者并且源以高频率生成,那么你将开始累积未发送事件的缓冲区。

    TaskPoolScheduler 的内存效率较低,但会同时通知观察者,因此可以通过利用计算机上的所有内核来处理比EventLoopScheduler 更高频率的事件。

    【讨论】:

    • 您还可以为每个核心创建一个EventLoopScheduler,并在入口点上实现一个负载均衡器;例如,循环调度程序选择。这可以封装在它自己的 IScheduler 实现中,这样你只需要传递一个引用。
    • 当线程池开始增加它的线程数时,看着内存呈指数增长非常有趣! imgur.com/NGnDOH0 修正了戴夫的错字。另外,我过去曾成功使用过您建议的方法。
    【解决方案2】:

    您可能想使用TaskPoolScheduler.Default.DisableOptimizations(typeof(ISchedulerLongRunning))。如果您不介意失去并行性,EventLoopScheduler 是一个不错的选择。

    如果您仍想并行执行工作但想使用线程池线程,则此选项更可取。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-10-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-17
      • 2013-10-28
      • 1970-01-01
      相关资源
      最近更新 更多