【问题标题】:Throttle RX task based on CPU usage根据 CPU 使用率限制 RX 任务
【发布时间】:2014-08-14 10:00:32
【问题描述】:

我有一个长期运行的任务(从 Kinect One 的深度图像创建纹理),它是使用 Reactive Extensions 实现的。其要点如下:

kinectWrapper.DepthFrames
    .ObserveOn(new EventLoopScheduler())
    .Select(f => do some CPU intensive data manipulation to create the color texture I want)
    .Subscribe(colorFrame => fill texture on GPU)

问题是 select 和 subscribe 在系统上都相当繁重,不会全速运行。我已经设法让它在我的开发 PC 上以可接受的速度运行 .Sample(TimeSpan.FromMilliseconds(100)),但我宁愿让它根据 CPU 使用率降低帧速率。

我认为有两种可能:

  1. 创建一个辅助 IObservable 作为 Sample 的输入,动态限制主事件循环。
  2. 编写我自己的 IScheduler,当它被任务淹没时,它会丢弃计划任务。

【问题讨论】:

    标签: system.reactive


    【解决方案1】:

    可以通过修改此处找到的扩展方法的行为来实现解决方案:http://rxx.codeplex.com/workitem/20724

    下面是一个例子。在这种情况下,我已经修改了行为,以便扩展方法将通过丢弃最旧的通知来限制排队通知的数量,直到队列大小可以接受。

    为满足您的要求,您可以对其进行修改,使其根据您可以使用System.Diagnostics.PerformanceCounter 类读取的 CPU 指标丢弃某些通知。

    但是,您也可以尝试将自己从这些特定细节中抽象出来,也许您可​​以将下面的扩展方法与使用低优先级线程的调度程序一起使用。

    这意味着当 CPU 繁忙时,通知更有可能被丢弃。

    kinectWrapper.DepthFrames.ThrottledObserveOn(
        new EventLoopScheduler(start => new Thread(start) {Priority = ThreadPriority.Lowest, IsBackground = true}),
        5).Select(...
    
    public static IObservable<TSource> ThrottledObserveOn<TSource>(
        this IObservable<TSource> source,
        IScheduler scheduler,
        int maximumQueuedNotifications)
    {
        Contract.Requires(source != null);
        Contract.Requires(scheduler != null);
        Contract.Requires(maximumQueuedNotifications >= 0);
    
        return Observable.Create<TSource>(observer =>
        {
            var notificationsGate = new object();
            var acceptingNotification = false;
            var nextNotifications = new Queue<Notification<TSource>>();
            Notification<TSource> completionNotification = null;
            var schedulerDisposable = new MultipleAssignmentDisposable();
    
            var subscriptionDisposable = source.Materialize().Subscribe(notification =>
            {
                bool startAcceptingNotifications;
    
                lock (notificationsGate)
                {
                    startAcceptingNotifications = !acceptingNotification;
                    acceptingNotification = true;
    
                    if (notification.Kind == NotificationKind.OnNext)
                    {
                        nextNotifications.Enqueue(notification);
                    }
                    else
                    {
                        completionNotification = notification;
                    }
                }
    
                if (startAcceptingNotifications)
                {
                    schedulerDisposable.Disposable = scheduler.Schedule(rescheduleAction =>
                    {
                        Notification<TSource> notificationToAccept;
                        lock (notificationsGate)
                        {
                            if (nextNotifications.Any())
                            {
                                do
                                {
                                    notificationToAccept = nextNotifications.Dequeue();
                                }
                                while (nextNotifications.Count > maximumQueuedNotifications);
                            }
                            else
                            {
                                notificationToAccept = completionNotification;
                                completionNotification = null;
                            }
                        }
    
                        notificationToAccept.Accept(observer);
    
                        bool continueAcceptingNotification;
    
                        lock (notificationsGate)
                        {
                            continueAcceptingNotification = acceptingNotification = nextNotifications.Any() || completionNotification != null;
                        }
    
                        if (continueAcceptingNotification)
                        {
                            rescheduleAction();
                        }
                    });
                }
            });
            return new CompositeDisposable(subscriptionDisposable, schedulerDisposable);
        });
    }
    

    【讨论】:

    • 这似乎是一个不错的方法,但不幸的是 PerformanceCounter 没有提供有关特定线程的分析。他们的系统作为一个整体可能没有那么忙。
    • 您在问题中提到了 CPU 使用率,这就是我提到 PerformanceCounter 类的原因。如果您只是想确保处理这些通知的线程不会被淹没,则可以使用 nextNotifications 队列的大小/和/或大小变化来做出此决定。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-02-26
    • 2017-01-25
    • 2012-07-06
    • 2016-08-19
    相关资源
    最近更新 更多