【问题标题】:ManualResetEvent - Backlog of eventsManualResetEvent - 事件积压
【发布时间】:2013-11-19 12:24:39
【问题描述】:

我在 ManualResetEvent 和事件积压方面遇到了一些问题。我的应用程序正在订阅消息,然后运行一个很长的任务。

我遇到的问题是我收到的消息超出了我的处理能力。任务大约需要 5 秒来处理,但我每 2-3 秒就会收到一条新消息。

理想情况下,我想做的是忽略任何新事件,直到我完成任务处理然后再次开始“监听”。目前,我似乎正在按接收和处理的顺序积压事件。正如您可以想象的那样,几个小时后,正在处理的消息已经很旧了。

我无法从多个线程运行长时间运行的任务。

也许我需要某种排队机制,然后清除最后一条消息(Last On First Off)并删除队列?

有什么想法吗?

我还在长期运行过程结束时调用 ManualResetEvent.Set() - 根据我的研究,我知道这是正确的吗?我应该在长时间运行的任务开始时 Reset() 导致线程阻塞,然后在结束时 Set() 吗?

【问题讨论】:

  • 为什么不:接收消息 -> 取消订阅 -> 处理消息 -> 重新订阅?
  • 嗨,我确实想到了,但订阅/取消订阅是由第 3 方提供的,并引发了错误。我会联系他们,但希望作为一种快速的解决方法,我可以创建一个队列然后排在首位。
  • 我已经开始使用后台线程来完成工作并添加了逻辑if(bw.IsBusy != true) do_work, else continue listening for next subscription

标签: c# multithreading events .net-4.0


【解决方案1】:

创建一个循环缓冲区,将其视为 LIFO 队列(堆栈)。因此,假设您希望队列中最多有 10 个条目:

const int MaxItems = 10;
Item[] theQueue = new Item[];
int insertPoint = 0;
object myLock = new object();
// initialize the array to all NULL.

void Enqueue(Item t)
{
    lock (myLock)
    {
        theQueue[insertPoint] = t;
        insertPoint = (insertPoint+1) % 10;
    }
}

Item Dequeue()
{
    lock (myLock)
    {
        int takeFrom = insertPoint-1;
        if (takeFrom < 0)
            takeFrom = MaxItems-1;
        if (theQueue[takeFrom] != null)
        {
            var rslt = theQueue[takeFrom];
            insertPoint = takeFrom;
            return rslt;
        }
        // queue is empty. Either return null or throw an exception.
        return null;
    }
}

当然,您会希望将所有这些都包装成一个漂亮的对象。但这是基本思想。

【讨论】:

    【解决方案2】:

    这个怎么样:

    在任何给定时间,您都有一条消息正在处理中,还有一条消息等待处理。

    当您收到一条新消息时,您会覆盖等待处理的消息。

    您的处理线程一直等到有消息等待处理,然后将其标记为正在处理,处理它,然后重新启动。

    添加一些同步逻辑,你会得到这个代码:

    private object _sync = new object();
    private Message _beingProcessed;
    private Message _waitingToBeProcesssed;
    
    public void OnMessageReceived(Message message)
    {
        lock(sync)
        {
            _waitingToBeProcesssed = message;
            Monitor.Pulse(sync);
        }
    }
    
    public void DoWork()
    {
        while (true)
        {
            lock (sync)
            {
                while (_waitingToBeProcesssed == null)
                {
                    Monitor.Wait(sync);
                }
    
                _beingProcessed = _waitingToBeProcesssed;
                _waitingToBeProcesssed = null;
            }
    
            Process(_beingProcessed); //Do the actual work
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-04-09
      • 1970-01-01
      • 2011-01-01
      • 1970-01-01
      • 2013-01-14
      • 2013-08-09
      • 2019-10-06
      • 2016-06-22
      相关资源
      最近更新 更多