【问题标题】:Multiple threads accessing a collection多个线程访问一个集合
【发布时间】:2017-03-30 11:44:28
【问题描述】:

我有以下情况。

我有一个接收消息的队列。当消息进入时,会创建一个新线程来处理它。

此线程获取消息并将其添加到集合中。然后它检查集合是否包含 100 个项目,如果包含则将它们发送到其他地方并清除集合。

我不能使用常规列表,因为我得到了集合被修改,枚举不能继续错误。所以我需要使用线程安全的集合。

不过,我担心的是,一个线程写入它并且它是第 100 个项目,而它将它们发送到其他地方,另一个线程会添加到集合中。使其成为 101 个项目,触发第 100 个的线程然后将其清除,我丢失了一个项目。

我不能使用并发包,因为它不明确,我无法迭代包并逐个删除,因为消息可能会进入并添加得比删除速度更快,而且它永远不会结束。

ConcurrentStack 有一个明确的,但在这种情况下会起作用吗?

一些代码来说明我的意思,HandleMeasurementMessage 发生在每条消息的新线程上。

private static readonly ConcurrentStack<EventHubDatum> EventHubDataBatch = new ConcurrentStack<EventHubDatum>();

private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

   EventHubDataBatch.Push(eventHubDatum);

   if(EventHubDataBatch.Count == 100)
   {
      /* Send them off*/
      EventHubDatabatch.Clear();
   }
}

奇怪的是,只有当我没有通过 VS2015 中的调试器运行枚举时,才会出现枚举被修改的问题。该程序运行一个小时左右就好了。如果我关闭调试器,我会收到这些枚举错误,这就是我尝试切换到线程安全集合的原因。我只是不确定哪个合适。

调用 HandleMeasurementMessage 的代码

_busSingle.Consume<MessageEnvelope>(_queueMeasurement, (msg, MessageReceivedInfo) => Task.Factory.StartNew(() =>
            {
                try
                {
                    HandleMeasurementMessage(msg);
                }
                catch (Exception ex)
                {
                    /* Logging stuff*/
                }
            }));

【问题讨论】:

  • 有很多可用的同步机制...你研究过.NET中的线程同步吗?
  • @rory.ap,是的,我一直在阅读 .NET 文档,但这有点令人困惑。因此这里的问题。
  • 那你的问题需要谈一下。你做了什么研究(包括链接)?它揭示了哪些主题,为什么这些信息没有帮助您解决当前的问题,或者您不了解哪些问题?
  • “当消息进入时,会创建一个新线程来处理它。” 我是否按照上面所说的那样阅读?如果您收到 100 条消息,您将有 100 个线程?消息来自哪里?
  • 只用一个简单的lock,这里不需要并发集合

标签: c# multithreading thread-safety


【解决方案1】:

我会像这样使用简单的锁:

private static readonly List<EventHubDatum> EventHubDataBatch = new List<EventHubDatum>();        
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

    EventHubDatum[] toSend = null;
    lock (EventHubDataBatch) {
        EventHubDataBatch.Add(eventHubDatum);

        if (EventHubDataBatch.Count == 100) {
            // copy to local
            toSend = EventHubDataBatch.ToArray();
            EventHubDataBatch.Clear();
        }
    }

    if (toSend != null) {
        /* Send them off*/
    }
}

此处的锁定非常简短,因此它不会以任何明显的方式影响您的情况下的性能。请注意,如果有 100 个项目 - 我们将它们复制到本地数组并清除源列表以在“发送它们”操作期间不持有锁,这可能需要很长时间。

【讨论】:

  • omg....两个灵魂,一个解决方案...不错的一个..两次相同的解决方案没有用... +1
  • 是的,我已经将 100 个项目复制到另一个列表中,并将该列表发送到发送它们的方法,因为它是异步的并且可能有点慢(可能)
  • @Evk 这种模式有官方名称吗?我经常使用它,如果能给它起个名字就太好了。
  • @JeroenvanLangen 不知道,对我来说这只是“有意义”。而您提供相同解决方案的事实只是证实了这一点:)
  • @Evk 对于处理网络消息或日志消息非常有用。很好的解决方案并且工作正常。 ConcurrentQueue.Dispose() ;-)
【解决方案2】:

使用AutoResetEvent 之类的同步对象,一次只允许一个线程访问集合。

示例用法:

static AutoResetEvent MeasureMessageEvent = new AutoResetEvent(true);

private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

    // Wait for exclusive access
    MeasureMessageEvent.WaitOne();

    EventHubDataBatch.Push(eventHubDatum);

    if(EventHubDataBatch.Count == 100)
    {
       /* Send them off*/
       EventHubDatabatch.Clear();
    }

    // Release exclusive access
    MeasureMessageEvent.Set();
}

【讨论】:

  • EventHubdataBatch 必须是什么集合类型?
  • 集合类型无关。
  • 是的,如果您查看上面的代码,AutoResetEvent 对象与集合类型没有任何关系。重要的是,在修改集合之前调用 WaitOne() 并确保在之后释放调用 Set() 函数的访问权限。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-08-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多