【发布时间】:2017-03-30 11:44:28
【问题描述】:
我有以下情况。
我有一个接收消息的队列。当消息进入时,会创建一个新线程来处理它。
此线程获取消息并将其添加到集合中。然后它检查集合是否包含 100 个项目,如果包含则将它们发送到其他地方并清除集合。
我不能使用常规列表,因为我得到了集合被修改,枚举不能继续错误。所以我需要使用线程安全的集合。
不过,我担心的是,一个线程写入它并且它是第 100 个项目,而它将它们发送到其他地方,另一个线程会添加到集合中。使其成为 101 个项目,触发第 100 个的线程然后将其清除,我丢失了一个项目。
我不能使用并发包,因为它不明确,我无法迭代包并逐个删除,因为消息可能会进入并添加得比删除速度更快,而且它永远不会结束。
ConcurrentStack 有一个明确的,但在这种情况下会起作用吗?
一些代码来说明我的意思,HandleMeasurementMessage 发生在每条消息的新线程上。
private static readonly ConcurrentStack<EventHubDatum> EventHubDataBatch = new ConcurrentStack<EventHubDatum>();
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
/* Do a bunch of stuff to msg */
EventHubDataBatch.Push(eventHubDatum);
if(EventHubDataBatch.Count == 100)
{
/* Send them off*/
EventHubDatabatch.Clear();
}
}
奇怪的是,只有当我没有通过 VS2015 中的调试器运行枚举时,才会出现枚举被修改的问题。该程序运行一个小时左右就好了。如果我关闭调试器,我会收到这些枚举错误,这就是我尝试切换到线程安全集合的原因。我只是不确定哪个合适。
调用 HandleMeasurementMessage 的代码
_busSingle.Consume<MessageEnvelope>(_queueMeasurement, (msg, MessageReceivedInfo) => Task.Factory.StartNew(() =>
{
try
{
HandleMeasurementMessage(msg);
}
catch (Exception ex)
{
/* Logging stuff*/
}
}));
【问题讨论】:
-
有很多可用的同步机制...你研究过.NET中的线程同步吗?
-
@rory.ap,是的,我一直在阅读 .NET 文档,但这有点令人困惑。因此这里的问题。
-
那你的问题需要谈一下。你做了什么研究(包括链接)?它揭示了哪些主题,为什么这些信息没有帮助您解决当前的问题,或者您不了解哪些问题?
-
“当消息进入时,会创建一个新线程来处理它。” 我是否按照上面所说的那样阅读?如果您收到 100 条消息,您将有 100 个线程?消息来自哪里?
-
只用一个简单的
lock,这里不需要并发集合
标签: c# multithreading thread-safety