【问题标题】:dispatch work from many threads to one synchronous thread从多个线程分派工作到一个同步线程
【发布时间】:2013-12-20 23:27:30
【问题描述】:

假设我有 10 个线程忙于做某事,它们有时会调用一个方法

public HandleWidgets(Widget w) {  HeavyLifting(w) }

但是,我不希望我的 10 个线程在 HeavyLifting(w) 上等待,而是将 HeavyLifting(w) 工作分派给第 11 个线程,即 HeavyLifter 线程并异步继续。调度到的 HeavyLifter 线程应该始终是同一个线程,并且我不想创建多个线程(因此,我不能这样做:C# Asynchronous call without EndInvoke?)。

HeavyLifting(w) 是“一劳永逸”,因为调用 HandleWidgets() 的线程不需要回调或类似的东西。

什么是健康的做法?

【问题讨论】:

  • 你的线程设置很奇怪。与其将“繁重的工作”分配给多个线程,不如将其分配给多个线程,然后让单个线程陷入实际工作中。这与您在尝试从多个内核中获取收益时通常想要的相反
  • 我已经编辑了你的标题。请参阅“Should questions include “tags” in their titles?”,其中的共识是“不,他们不应该”。
  • 嗨,Dlev,这很不寻常。我的实际应用是十个线程与外部硬件接口,第十一个线程正在解决与从硬件整理的数据有关的问题。我不应该说更多;)

标签: c# .net multithreading


【解决方案1】:

我很惊讶这里的其他答案都没有提到TPL DataFlow。您将一堆块连接在一起并通过链发布数据。您可以显式控制每个块的并发性,因此您可以执行以下操作:

var transformBlk =
    new TransformBlock<int,int>(async i => {
        await Task.Delay(i);
        return i * 10;
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 10});

var processorBlk=
    new ActionBlock<int>(async i => {
        Console.WriteLine(i);
    },new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 1});

transformBlk.LinkTo(processorBlk);
var data = Enumerable.Range(1, 20).Select(x => x * 1000);
foreach(var x in data)
{
    transformBlk.Post(x);
}

【讨论】:

    【解决方案2】:

    基本上,您有一个线程是工作的生产者,还有一个线程是工作的消费者

    创建一个线程并在循环中从BlockingCollection 中获得Take。这是您的consumer 线程,它将调用HeavyLifting。它只会等到一个项目可用,然后处理它:

    对 Take 的调用可能会阻塞,直到可以删除项目为止。

    其他线程可以简单地将项目添加到集合中。

    请注意,BlockingCollection 不保证自行添加/删除项目的顺序:

    删除项目的顺序取决于用于创建 BlockingCollection 实例的集合类型。创建 BlockingCollection 对象时,可以指定要使用的集合类型

    【讨论】:

      【解决方案3】:

      您可以创建一个有限并发TaskScheduler 用作Task 工厂,如this example from MSDN 中提供的那样,仅限于单线程:

      var lcts = new LimitedConcurrencyLevelTaskScheduler(1);
      TaskFactory factory = new TaskFactory(lcts);
      

      实现你的功能为:

      public HandleWidgets(Widget w) 
      {  
          factory.StartNew(() => HeavyLifting(w));
      }
      

      【讨论】:

        【解决方案4】:

        创建一个在所有线程之间共享的队列和一个在所有线程之间共享的信号量。工作线程(不应等待 HeavyLifter)像这样发布请求:

        lock (queue)
        {
            queue.Enqueue(request);
            semaphore.Release();
        }
        

        HeavyLifter 是一个后台线程(不会阻止进程退出)并在无限循环中运行以下代码:

        while (true)
        {
            semaphore.WaitOne();
            Request item = null
            lock (queue)
            {
                item = queue.Dequeue();
            }
            this.ProcessRequest(item);
        }
        

        编辑:错字。

        【讨论】:

          【解决方案5】:

          --- 编辑---

          我刚刚注意到您需要“一劳永逸”,在这种情况下,单独的阻塞集合就足够了。下面的解决方案确实适用于更复杂的场景,您需要返回结果、传播异常或以某种方式(例如通过 async/await)组合任务等......


          使用TaskCompletionSource 将“同步”线程中完成的工作作为基于Task 的API 公开给“客户端”线程。

          对于HandleWidgets 的每次调用(CT = “客户端线程”,“ST” = 同步线程):

          • CT:创建一个单独的TaskCompletionSource
          • CT:将HeavyLifting 发送给ST(可能通过BlockingCollection;同时将TaskCompletionSource 传递给它,这样它就可以完成下面的最后一步)。
          • CT:将TaskCompletionSourceTask 返回给调用者而不等待ST 上的工作完成。
          • CT:正常继续。如果/当无法继续而不等待 HeavyLifting 完成(在 ST 中),请等待上述任务。
          • ST:当HeavyLifting 完成时,调用SetResult(或SetExceptionSetCanceled,视情况而定),这会解锁当前可能等待任务的所有CT。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2015-08-23
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多