【问题标题】:How to handle processing real-time events that fire before processing of previous event is complete (C#)如何处理在处理先前事件完成之前触发的实时事件(C#)
【发布时间】:2011-12-27 06:22:54
【问题描述】:

假设我们有一个实时事件的侦听器,它在事件被触发时执行一些代码块。

对于我们的讨论,假设我们有一个 MyTime 类,它有一个成员 currentTime。

我们已对其进行了设置,以便每当计算机时钟发生变化时,currentTime 都会设置为当前时间的值。 我们已经为 currentTime 对象实现了属性更改的 INotifyPropertyChanged 接口:

public event PropertyChangedEventHandler PropertyChanged;


       public string currentTime
            {
                get { return _currentTime; }
                set { _currentTime= value; this.NotifyPropertyChanged("currentTime"); } 
             }


    public void NotifyPropertyChanged(object sender, PropertyChangedEventArgs e) {

    if (PropertyChanged != null)
                PropertyChanged(this, new PropertyChangedEventArgs(name));

    }

其他一些类,比如 ProcessTime 正在监听这个事件:

TimeChanged += new PropertyChangedEventHandler(PropertyChanged};

它有一个可以执行某事的函数:

public void TimeChanged(object sender, PropertyChangedEventArgs e)
{

// Process lots of calculations

}

由于我们的计算机时钟一直在变化,它会不断触发事件。 据我了解,一旦发生第一次更改,我们将执行 TimeChanged 块。在执行过程中,我们会不断收到越来越多的通知并尽可能快地处理它们,从而形成一长串仍有待处理的事件。

问题在于,在我们处理了第一次更改并继续进行下一次更改后,“实时”已经遥遥领先,无论我们在计算什么,我们都是在计算过去发生的事情。

我们想要做的是忽略所有新事件,直到我们完成原始处理,然后才重新开始侦听该事件。

设置多个线程不是一种选择,因为它不能解决问题,而且我们不想处理每次更改,只处理我们的资源已释放的那些。

显然,我已将时间更改和上述代码用作演示示例,但它简洁而充分地(恕我直言)展示了我们在此尝试完成的任务。

我想使用某种缓冲区,但我在这里的知识非常有限。 谢谢

感谢到目前为止的所有答案。将开始实施它。将尝试记录成功/失败。

【问题讨论】:

    标签: c# events real-time buffering


    【解决方案1】:

    首先,有问题的事件没有被异步调用。因此,除非您在不断变化的线程上设置时间,否则设置时间的调用不会回来,并且在所有事件都处理完之前您不会再次设置它。如果你想防止这个问题,你需要将事件处理移到不同的线程。

    最终情况的复杂性以及您想要的实时性可以决定最终的答案。但是,假设您想要一些对于相对较少的线程(比如说十几个)来说相当健壮的东西,我大概会这样做。

    private var _Callbacks = new List<PropertyChangedEventHandler>();
    
    public event PropertyChangedEventHandler PropertyChanged
    {
        add
        {
            lock(_Callbacks)
                _Callbacks.Add(value);
    
            Thread Worker = new Thread(PollTime);
            Worker.Background = true;
            Worker.Start(value);
        }
        remove
        {
            lock(_Callbacks)
                _Callbacks.Remove(value);
        }
    }
    
    private void PollTime(object callback)
    {
        PropertyChangedEventHandler c = (PropertyChangedEventHandler)callback;
        string LastReported = null;
    
        while(true)
        {
            lock(_Callbacks)
                if (!_Callbacks.Contains(c))
                    return;
    
            if (LastReported != _currentTime)
            {
                LastReported = _currentTime;
                c(this, new PropertyChangedEventArgs(name));
            }
            else
                Thread.Sleep(10);
        }
    }
    
    public string currentTime
    {
        get { return _currentTime; }
        set { _currentTime= value; } 
    }
    

    这样您就可以确保事件的线程安全(以防有人在不合时宜的时间尝试订阅/取消订阅),并且每个订阅者都有自己的线程来处理回调。订阅者不会收到所有相同的事件,但是当时间更改时,他们都会收到通知。较慢的将不会获得尽可能多的事件,因为它们会丢失一些中间值。这不会通知时间是否在没有变化的情况下重置,但我认为损失不大。如果值在有限的集合内交替出现,您可能会发现问题,但随着时间的推移,这不是问题。

    有关代表、事件及其工作方式的更多信息,http://www.sellsbrothers.com/writing/delegates.htm 上有一篇很长但很好的文章

    【讨论】:

    • 如果开始报告事件中的值,请使用LastReported 以避免线程并发问题。另外值得指出的是,如果您的更新是突发的,那么每个订阅者都会获得最后一个推送的值(假设它们不会花费太长时间以至于它们与下一次突发重叠)
    【解决方案2】:

    这将是我的方法。

    1. 不要让消费者阻塞生产者的事件线程。
    2. 创建一个轻量级的“临界区”(基本上是一个原子条件变量),以便在给定时间只能激活一次消费者处理程序调用。

    这是一个实现此逻辑的完整示例。有一个EventProducer 和一个EventConsumer。它们可以根据需要配置为比彼此更快或更慢。事件生产者创建一个后台线程来引发事件。 EventConsumer 使用带有简单 TryEnter/Exit 模式的自定义 CriticalSectionSlim 类,以避免同时调用处理代码。它还使用 .NET 4.0 Task 类的默认行为将处理代码发布到线程池。如果发生异常,则在下一次调用时从主处理程序线程重新抛出。

    using System;
    using System.Globalization;
    using System.Threading;
    using System.Threading.Tasks;
    
    internal sealed class Program
    {
        private static void Main(string[] args)
        {
            using (EventProducer producer = new EventProducer(TimeSpan.FromMilliseconds(250.0d)))
            using (EventConsumer consumer = new EventConsumer(producer, TimeSpan.FromSeconds(1.0d)))
            {
                Console.WriteLine("Press ENTER to stop.");
                Console.ReadLine();
            }
    
            Console.WriteLine("Done.");
        }
    
        private static class ConsoleLogger
        {
            public static void WriteLine(string message)
            {
                Console.WriteLine(
                    "[{0}]({1}) {2}",
                    DateTime.Now.ToString("hh:mm:ss.fff", CultureInfo.InvariantCulture),
                    Thread.CurrentThread.ManagedThreadId,
                    message);
            }
        }
    
        private sealed class EventConsumer : IDisposable
        {
            private readonly CriticalSectionSlim criticalSection;
            private readonly EventProducer producer;
            private readonly TimeSpan processingTime;
    
            private Task currentTask;
    
            public EventConsumer(EventProducer producer, TimeSpan processingTime)
            {
                if (producer == null)
                {
                    throw new ArgumentNullException("producer");
                }
    
                if (processingTime < TimeSpan.Zero)
                {
                    throw new ArgumentOutOfRangeException("processingTime");
                }
    
                this.processingTime = processingTime;
                this.criticalSection = new CriticalSectionSlim();
                this.producer = producer;
                this.producer.SomethingHappened += this.OnSomethingHappened;
            }
    
            public void Dispose()
            {
                this.Dispose(true);
                GC.SuppressFinalize(this);
            }
    
            private void Dispose(bool disposing)
            {
                if (disposing)
                {
                    this.producer.SomethingHappened -= this.OnSomethingHappened;
                }
            }
    
            private void OnSomethingHappened(object sender, EventArgs e)
            {
                if (this.criticalSection.TryEnter())
                {
                    try
                    {
                        this.StartTask();
                    }
                    catch (Exception)
                    {
                        this.criticalSection.Exit();
                        throw;
                    }
                }
            }
    
            private void StartTask()
            {
                if (this.currentTask != null)
                {
                    this.currentTask.Wait();
                }
    
                this.currentTask = Task.Factory.StartNew(this.OnSomethingHappenedTask);
            }
    
            private void OnSomethingHappenedTask()
            {
                try
                {
                    this.OnSomethingHappenedImpl();
                }
                finally
                {
                    this.criticalSection.Exit();
                }
            }
    
            private void OnSomethingHappenedImpl()
            {
                ConsoleLogger.WriteLine("BEGIN: Consumer processing.");
                Thread.CurrentThread.Join(this.processingTime);
                ConsoleLogger.WriteLine("END:   Consumer processing.");
            }
        }
    
        private sealed class EventProducer : IDisposable
        {
            private readonly TimeSpan timeBetweenEvents;
            private readonly Thread thread;
            private volatile bool shouldStop;
    
            public EventProducer(TimeSpan timeBetweenEvents)
            {
                if (timeBetweenEvents < TimeSpan.Zero)
                {
                    throw new ArgumentOutOfRangeException("timeBetweenEvents");
                }
    
                this.timeBetweenEvents = timeBetweenEvents;
                this.thread = new Thread(this.Run);
                this.thread.Start();
            }
    
            public event EventHandler SomethingHappened;
    
            public void Dispose()
            {
                this.Dispose(true);
                GC.SuppressFinalize(this);
            }
    
            private void Dispose(bool disposing)
            {
                if (disposing)
                {
                    this.shouldStop = true;
                    this.thread.Join();
                }
            }
    
            private void Run()
            {
                while (!shouldStop)
                {
                    this.RaiseEvent();
                    Thread.CurrentThread.Join(this.timeBetweenEvents);
                }
            }
    
            private void RaiseEvent()
            {
                EventHandler handler = this.SomethingHappened;
                if (handler != null)
                {
                    ConsoleLogger.WriteLine("Producer is raising event.");
                    handler(this, EventArgs.Empty);
                }
            }
        }
    
        private sealed class CriticalSectionSlim
        {
            private int active;
    
            public CriticalSectionSlim()
            {
            }
    
            public bool TryEnter()
            {
                return Interlocked.CompareExchange(ref this.active, 1, 0) == 0;
            }
    
            public void Exit()
            {
                Interlocked.Exchange(ref this.active, 0);
            }
        }
    }
    

    【讨论】:

      【解决方案3】:

      你可以这样做

      一旦您收到事件,请取消注册该事件,或者换句话说,停止收听事件。

      完成事件处理后,通过重新注册事件再次开始监听事件

      public void TimeChanged(object sender, PropertyChangedEventArgs e)
      {
      
      //un register
      TimeChanged -= new PropertyChangedEventHandler(PropertyChanged};
      
      // Process lots of calculations
      
      //re-register
      TimeChanged += new PropertyChangedEventHandler(PropertyChanged};
      
      }
      

      【讨论】:

      • 您通过重复注册/注销来搞乱集合,这实际上可能会破坏事件调用代码。
      • @Haris Hasan 在特定情况下,我们不担心收藏,只担心特定项目。那会不会有问题?如果是这样,想法如何避免?对此的跟进:您认为注册/注销的时间是多少?我们的想法是“错过”尽可能少的事件
      • 我想不出在这种情况下会导致问题的任何原因。在处理过程中您已经遗漏了一些事件我认为您不会仅仅因为您正在重新注册该事件而导致大量事件
      【解决方案4】:

      我建议将所有计划任务放在一个按进程时间(DateTime)排序的队列中。时间事件(tick)只需要检查队列头部的任务是否“待处理”。也就是说,如果它的处理时间已经到达或过去。然后根据当前时间从队列中删除该任务并执行。

      任务通过在执行方法中给出的回调通知任务队列完成时(这可能也是当前时间)。任务执行时任务队列不会执行任何其他任务。当任务通知完成时,任务队列将立即检查队列头部的任务(如果有)是否处于挂起状态,依此类推。

      现在,当您有时间排序的任务队列时,可以在这里完成一个很好的改进,即当您需要执行任务时,您可以设置一个计时器来触发(或一次更改的侦听器),而不是定期滴答。队列,因为您总是知道下一个事件的时间。不需要多个侦听器和一个控制器来执行多少个等等。

      interface ITask
      {
          void Execute(ITaskCallBack callBack, DateTime currentTime);
      }
      
      interface ITaskCallBack
      {
          void OnCompleted(ITask task); // The task parameter is needed for concurrency
      }
      

      每当添加或删除任务时,下一个事件的时间就会更新。

      重要提示:如果您添加一个恰好要与现有任务同时执行的新任务,您应该在所有任务之后同时添加它。这样可以避免子任务占用调度程序。

      定时队列是您的任务调度程序/控制器。您可以根据需要使其成为单线程或多线程。尽管除非您使用多个处理器,否则我认为多线程没有多大意义。

      interface ITaskScheduler
      {
          void Add(ITask task, DateTime executeTime);
          void Remove(ITask);
      }
      

      这里的另一个很好的衍生是调度程序知道计划的时间和实际开始的时间。因此,您可以对由于加载导致的延迟任务或延迟进行有价值的诊断。如果您的系统需要性能确定性,这很重要。

      希望这有意义并且有用。

      最好的问候

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-03-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多