【问题标题】:Thread-safe queue with tasks带有任务的线程安全队列
【发布时间】:2017-05-30 09:01:38
【问题描述】:

我在 c# 中有某种工作管理器,它将接收要执行的任务并执行它们。任务将来自不同的线程,但它们必须按照接收顺序同时执行一个。 我不想要一个 while 循环,它会一直运行,检查它们是否是队列中的新任务。是否有内置队列或实现队列的简单方法,该队列将等待任务并同步执行它们而无需忙于等待?

【问题讨论】:

  • 我不认为这种模式有构建功能,但你可以通过 SemaphoreSlim 构建一个没有 while 循环的模式
  • 你检查ConcurrentQueue<T>和/或Queue.Synchronized了吗?
  • TPL 数据流是一种可行的方法,考虑到您的要求是它并非一直在运行。您可以控制它的执行方式(在线程等方面),所以我认为您应该能够确保一次只运行一个。但请注意不要创建积压。
  • “我不想要一个while循环”你想在线程池上运行它们吗?我制作了一个等待执行项目的 WorkerThread。 (它将被通知开始。它也支持批处理。但它会产生一个带有 while 循环的线程)
  • BlockingCollection(它是 TPL DataFlow 的基本元素)是您正在寻找的。尽管看起来像,但也没有忙着等待。等待由事件完成,没有 cpu 时间成本 => 不忙但等待

标签: c# multithreading queue task


【解决方案1】:

根据 cmets,您应该查看 ConcurrentQueue 以及 BlockingCollection 并使用 GetConsumingEnumerable() 而不是不需要的 WHILE 循环

BlockingCollection<YourClass> _collection =
            new BlockingCollection<YourClass>(new ConcurrentQueue<YourClass>());

_collection.Add() can be called from multiple threads

您可以在单独的线程上使用

foreach (var message in _collection.GetConsumingEnumerable())
{}

【讨论】:

  • 请求是同时做一个
  • 他的句子不清楚“但它们必须同时执行一个”。我假设他希望他们一次按顺序执行一个
  • "但它们必须同时执行一个"
  • @DanielTshuva 我想我们在这里说的是同一件事。消息将按照收到的顺序一次处理一条。
【解决方案2】:

您可以使用 SemaphoreSlim (https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim(v=vs.110).aspx) 和 ConcurrentQueue

例子:

    private delegate void TaskBody();

    private class TaskManager
    {
        private ConcurrentQueue<TaskBody>
            TaskBodyQueue = new ConcurrentQueue<TaskBody>();

        private readonly SemaphoreSlim 
            TaskBodySemaphoreSlim = new SemaphoreSlim(1, 1);

        public async void Enqueue(TaskBody body)
        {
            TaskBodyQueue.Enqueue(body);

            await TaskBodySemaphoreSlim.WaitAsync();

            Console.WriteLine($"Cycle ...");

            if (TaskBodyQueue.TryDequeue(out body) == false) {
                throw new InvalidProgramException($"TaskBodyQueue is empty!");
            }

            body();

            Console.WriteLine($"Cycle ... done ({TaskBodyQueue.Count} left)");

            TaskBodySemaphoreSlim.Release();
        }
    }

    public static void Main(string[] args)
    {
        var random = new Random();
        var tm = new TaskManager();

        Parallel.ForEach(Enumerable.Range(0, 30), async number => {
            await Task.Delay(100 * number);

            tm.Enqueue(delegate {
                Console.WriteLine($"Print {number}");
            });
        });

        Task
            .Delay(4000)
            .Wait();

        WaitFor(action: "exit");
    }

    public static void WaitFor(ConsoleKey consoleKey = ConsoleKey.Escape, string action = "continue")
    {
        Console.Write($"Press {consoleKey} to {action} ...");

        var consoleKeyInfo = default(ConsoleKeyInfo);

        do {
            consoleKeyInfo = Console.ReadKey(true);
        }
        while (Equals(consoleKeyInfo.Key, consoleKey) == false);

        Console.WriteLine();
    }

【讨论】:

  • 谢谢。这看起来是一个很好的解决方案,但 BlockingCollection 似乎更容易解决这个问题。
猜你喜欢
  • 1970-01-01
  • 2015-09-17
  • 1970-01-01
  • 1970-01-01
  • 2012-11-05
  • 1970-01-01
  • 1970-01-01
  • 2010-10-07
  • 1970-01-01
相关资源
最近更新 更多