【问题标题】:Queue actions for a plain thread class?普通线程类的队列操作?
【发布时间】:2015-05-28 20:17:01
【问题描述】:

因为我无法捕获RaceOnRCWCleanup,并且因为我知道我在没有额外安全性的情况下使用来自多个线程的 COM 对象进行脏编程。我需要问:

问题:如何允许其他线程将工作项/操作排入我的工作线程(如 Invoke/BeginInvoke for Control)?基本上另一个线程应该能够告诉MyThread“做这个!做那个!”并执行no操作来代替MyThread

我还需要使用参数和返回值。资源(COM 对象)只能由这个线程使用。我考虑过创建一个要执行的操作枚举的队列,但我不知道如何添加参数等。我的第一个设计看起来像下面的代码。


编辑:线程对象外表简单对我来说也很重要。所以我想调用像myThread.Connect(); 这样的方法而不考虑任何代表和东西。线程对象应在后台使用委托或其他技术。现在我正在使用一个带有额外类的队列,该类存储被调用的方法、参数以及稍后的结果。结果通过“完成的事件”传递。我认为这不是一个好方法,因为对线程的所有“动作”调用都是异步的。
public class MyThread
{
    // private

    private enum MyEnum { Connect, Disconnect }
    private Queue<MyEnum> queue = new Queue<MyEnum>();

    private void Run()
    {
        // loops, does work and can regularly check a queue
    }

    // public

    public void Connect(string address, int parameter1, object parameter2)
    {
        // triggers connection to a db or somthing similar
        // internally it queues an action for this thread to perform
    }

    public object Disconnect()
    {
        // triggers disconnect from a db or something similar
        // internally it queues an action for this thread to perform
        // but must wait for result
    }

    public void Start()
    {
        // starts the thread
    }

    public void Stop()
    {
        // stops the thread
    }
}

【问题讨论】:

标签: c# multithreading


【解决方案1】:

这对自己构建很棘手,但通常通过以下几行的委托来完成:

public class MyThread
{
    private ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();

    private void Run()
    {
        Action action;
        if (queue.TryDequeue(out action))
            action();
    }

    public void Connect(Action action)
    {
        queue.Enqueue(action);
    }
}

在变量上使用带有闭包的参数示例:

        var t = new MyThread();
        var myVar = 3;
        t.Connect(delegate()
        {
            Console.WriteLine(myVar);
        });

如果您需要返回一个值,它会变得更加棘手:) 您的队列需要更改为可以同时包含 Action 和 Func 的类型。然后你可以添加一个类似的方法。

public void object Connect(Func<object> function)

您还需要某种线程等待,例如 ManualResetEvent,以便 Connect() 的调用者可以等待响应。

【讨论】:

  • 代表可能是最干净的方式。但是如何处理参数和结果呢?使用Action,您两者都没有。
  • 我添加了一个如何添加参数的示例。返回一个值变得更加棘手:)
【解决方案2】:

一个简单的解决方案是创建一个单线程 TPL TaskScheduler 并将任务排队。内置ConcurrentExclusiveSchedulerPair可能能够保证单个固定线程;您需要查看文档以确认这一点。如果没有,there are others such as QueuedTaskScheduler.

【讨论】:

    【解决方案3】:

    正如@usr 所说,创建一个单线程TaskScheduler 将是最好的选择。 ConcurrentExclusiveSchedulerPair.ExclusiveScheduler 保证它以独占方式执行,但不是在单线程中执行所有任务。它可以使用任何线程来执行Task。如果需要单个固定线程,需要自己实现。

    Custom scheduler topic in Msdn 展示了如何实现LimitedConcurrencyLevelTaskScheduler。我已经对其进行了修改,使其成为“单线程”。

    internal class SingleThreadedTaskScheduler : TaskScheduler
    {
        private readonly LinkedList<Task> tasks = new LinkedList<Task>(); // protected by lock(_tasks) 
    
        public SingleThreadedTaskScheduler()
        {
            RunThread();
        }
    
        protected sealed override void QueueTask(Task task)
        {
            lock (tasks)
            {
                tasks.AddLast(task);
                Monitor.Pulse(tasks);
            }
        }
    
        private void RunThread()
        {
            new Thread(_ =>
            {
                while (true)
                {
                    Task item;
                    lock (tasks)
                    {
                        if (tasks.Count == 0)
                        {
                            Monitor.Wait(tasks);
                        }
                        item = tasks.First.Value;
                        tasks.RemoveFirst();
                    }
                    base.TryExecuteTask(item);
                }
            }).Start();
        }
    
        protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
    
        protected sealed override bool TryDequeue(Task task)
        {
            lock (tasks) 
                return tasks.Remove(task);
        }
    
        public sealed override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }
    
        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            bool lockTaken = false;
            try
            {
                Monitor.TryEnter(tasks, ref lockTaken);
                if (lockTaken) return tasks;
                else throw new NotSupportedException();
            }
            finally
            {
                if (lockTaken) Monitor.Exit(tasks);
            }
        }
    }
    

    然后将其用作

    private static readonly TaskScheduler singleThreadedScheduler = new SingleThreadedTaskScheduler();
    
    private static void Main()
    {
        for (int i = 0; i < 10; i++)
        {
            int index = i;
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(100);//Simulate some work
                Console.WriteLine("Task id {0} Executed in Thread Id {1}", index, Thread.CurrentThread.ManagedThreadId);
            }, CancellationToken.None, TaskCreationOptions.None, singleThreadedScheduler);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-10-28
      • 2016-05-11
      • 1970-01-01
      相关资源
      最近更新 更多