【发布时间】:2013-02-16 15:28:16
【问题描述】:
我正在尝试找到一种方法来处理多个线程中的队列,动态调整消费者的数量。基本上这个任务是众所周知的:多个生产者创建消息并将它们提交到一个队列中,多个消费者处理来自队列的消息。现在,我考虑了使用不同组件(如 System.Collections.Queue.Synchronized、System.Collections.Concurrent.ConcurrentQueue 和 System.Collections.Concurrent.BlockingCollection)的不同方法,但我无法决定如何正确使用最高效率,因此我很高兴通过您的意见收到一些好主意。
以下是更多详细信息:
- 在某些情况下,消息速率预计会非常密集,但处理会相对简单;
- 我不知道我应该拥有多少消费者;
- 我希望进程调整当前消费者的数量,而不是阻止它们,具体取决于排队的消息数量(这意味着我想为每百条消息填充额外的消费者 fe,并且消费者应该如果排队的消息数量比填充它所需的数量少 50 条,则停止,当消息量超过 300 条时将填充第三个消费者,当它下降到 250 条时它应该停止)。
这就是想法。现在,我考虑将 ConcurrentQueue 包装到一个类中,该类将封装 Enqueue 方法,并检查入队后的消息数量,并决定是否启动额外的消费者。并且消费者应该在循环中进行检查,该检查应该决定停止它。我认为您会提出一些更有趣的解决方案。
顺便说一句,我仍然不知道如何处理的情况之一是理论上当最后一条消息正在排队并且同时最后一个消费者已经停止。另一种情况也是关于停止 - 如果多个消费者同时进行停止检查,他们将被停止。我应该如何处理这些情况?
为了说明我的意思,请考虑以下示例:
class MessageController
{
private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();
int amountOfConsumers;
public void Enqueue(IMessage message)
{
messageQueue.Add(message); // point two
if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers) // point three
{
Task.Factory.StartNew(() =>
{
IMessage msg;
while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers)) //point one
{
msg = messageQueue.Take();
//process msg...
}
ConsumerQuit(); // point four
});
Interlocked.Increment(ref amountOfConsumers);
}
}
public void ConsumerQuit()
{
Interlocked.Decrement(ref amountOfConsumers);
}
}
所以现在当我可以指出具体的代码行时,这些是问题:
- 当最后一个消费者发现没有消息入队时(@point one),在它调用ConsumerQuit方法之前,最后一条消息到达并入队,然后检查额外的消费者,结果是(@第三点)仍然有一个消费者在工作,因为单个消息的一个消费者绰绰有余 - 没有任何反应,最后调用了 ConsumerQuit,我有一条消息卡在队列中。
ConsumerTask | LastMessageThread ------------------------------------------------------ @point one(messageQueue.Count=0) | @point two no time | @point three(amountOfConsumers=1) @point four | ended; ended; | ended;
- 几个消费者同时到达“第一点”检查,当其中一个应该停止时(fe messageQueue.Count 为 249),其中几个将停止,因为在其中一个调用 ConsumerQuit 之前,其他几个会执行也进行此项检查。
ConsumerTask1 | ConsumerTask2| ConsumerTask3 | ConsumerTask4| ------------------------------------------------------------------------------ @point one(.Count=249;amount=4)| no time | no time | @point one | no time | @point one | processing msg| @point four | @point four | no time | @point one | ended; | ended; | @point four | processing msg| ended; | ended; | ended; | ... | ended; |在这里,如果最后一条消息已经入队,我们还剩下一个消费者任务,它必须单独处理 249 条消息,但最坏的情况可能是所有消息都停止,在最后一条消息之后,可能有数百条消息会卡住。
【问题讨论】:
-
使用内核同步 - BlockingCollection 等。如果性能不足,请回调。
-
您的问题可以通过使用
ThreadPool类并调整最小和最大线程数来解决。见this MSDN resource -
嗯,我不认为
ThreadPool中的线程是短暂的,否则最小线程数是什么意思?由于线程旨在运行定义明确的任务,因此线程可能只是在无事可做时进入睡眠状态。 -
@didierc 想了想,但没有人问,这需要一个接一个地发送消息以供某个将不断运行的线程处理,想防止这种情况发生。另外,我不确定当您继续触发短寿命线程时开销有多大。你知道或可以估计吗?
-
此外,按照传入消息数量的比例增加运行线程的数量不会让它们更快地处理。在超过进程可用的最大物理内核数后,性能将开始下降,因为线程管理系统将开始消耗越来越多的 CPU 资源来完成其工作。然而,如果你知道你永远不会达到那个最大值,那么你就不会有这个问题,但我怀疑你会得到比
ThreadPool类更好的性能。
标签: c# multithreading thread-safety threadpool message-queue