【问题标题】:c# multiple threads waiting for a ManualResetEventc# 多个线程等待ManualResetEvent
【发布时间】:2017-10-27 02:05:44
【问题描述】:

我正在搞乱多线程并制作某种任务引擎。这个想法是引擎可以有可配置数量的线程等待,当一个新任务到达时,第一个空闲线程会拿起它并执行它。

问题是 2 个线程以某种方式执行相同的任务。我仔细看了一遍,我认为这段代码应该可以工作,但显然不能。如果我在现在注释掉的地方添加 10ms 睡眠,它可以工作,但我不确定我是否理解为什么。看起来 .Reset() 函数在实际重置事件之前返回?

谁能解释一下?当有多个等待时,有没有更好的方法让一个线程继续?

谢谢

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TaskTest
{
    public class Engine
    {
        private ManualResetEvent taskEvent;
        private ConcurrentQueue<Task> tasks;
        private bool running;
        private List<Thread> threads;
        private int threadAmount;
        private int threadsBusy = 0;

        public Engine(int amountOfThreads)
        {
            taskEvent = new ManualResetEvent(false);
            tasks = new ConcurrentQueue<Task>();
            threads = new List<Thread>();

            threadAmount = amountOfThreads;
        }

        public void Start()
        {
            running = true;
            for (var i = 0; i < threadAmount; i++)
            {
                var thread = new Thread(Process);
                thread.Name = "Thread " + i;
                threads.Add(thread);
                thread.Start();
            }
        }

        public void Stop()
        {
            running = false;
            taskEvent.Set();
            threads.ForEach(t => t.Join());
        }

        private void Process()
        {
            while (running)
            {
                lock (taskEvent)
                {
                    // Lock it so only a single thread is waiting on the event at the same time
                    taskEvent.WaitOne();
                    taskEvent.Reset();
                    //Thread.Sleep(10);
                }

                if (!running)
                {
                    taskEvent.Set();
                    return;
                }

                threadsBusy += 1;
                if (threadsBusy > 1)
                    Console.WriteLine("Failed");

                Task task;
                if (tasks.TryDequeue(out task))
                    task.Execute();

                threadsBusy -= 1;
            }
        }

        public void Enqueue(Task t)
        {
            tasks.Enqueue(t);
            taskEvent.Set();
        }
    }
}

编辑 其余代码:

namespace TaskTest
{
    public class Start
    {
        public static void Main(params string[] args)
        {
            var engine = new Engine(4);
            engine.Start();

            while (true)
            {
                Console.Read();
                engine.Enqueue(new Task());
            }
        }
    }
}


namespace TaskTest
{
    public class Task
    {
        public void Execute()
        {
            Console.WriteLine(Thread.CurrentThread.Name);
        }
    }
}

【问题讨论】:

  • 你想要锁(obj)吗?
  • 我不确定你的意思是什么?
  • 这并不能直接回答您的问题,但您应该使用 BlockingCollection。它已经在做你想做的事了,只是它有效
  • 这与使用 ConcurrentQueue 有何不同?
  • 另外,如果你调用Enqueue 两次,就会出现明显的竞争条件:第一个线程恢复并递增threadsBusy 并开始执行工作。第二个线程也恢复了(因为第一个线程退出了锁)并且增加了threadsBusy=>threadsBusy 现在等于 2

标签: c# multithreading manualresetevent


【解决方案1】:

在按键上使用Console.Read() 时,会从输入中读取两个字符。您应该改用Console.ReadLine()

请注意,您的代码可以通过使用BlockingCollection 来处理同步进行大量简化:

public class Engine
{
    private BlockingCollection<Task> tasks;
    private List<Thread> threads;
    private int threadAmount;

    public Engine(int amountOfThreads)
    {
        tasks = new BlockingCollection<Task>();
        threads = new List<Thread>();

        threadAmount = amountOfThreads;
    }

    public void Start()
    {
        for (var i = 0; i < threadAmount; i++)
        {
            var thread = new Thread(Process);
            thread.Name = "Thread " + i;
            threads.Add(thread);
            thread.Start();
        }
    }

    public void Stop()
    {
        tasks.CompleteAdding();
        threads.ForEach(t => t.Join());
    }

    private void Process()
    {
        foreach (var task in tasks.GetConsumingEnumerable())
        {
            task.Execute();
        }
    }

    public void Enqueue(Task t)
    {
        tasks.Add(t);
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-22
    • 1970-01-01
    相关资源
    最近更新 更多