【问题标题】:TaskFactory, Starting a new Task when one endsTaskFactory,结束时启动一个新任务
【发布时间】:2015-11-22 09:52:10
【问题描述】:

我找到了许多使用TaskFactory 的方法,但我找不到任何关于开始更多任务以及观察一个任务何时结束并开始另一个任务的任何信息。

我总是希望有 10 个任务可以工作。

我想要这样的东西

int nTotalTasks=10;
int nCurrentTask=0;

Task<bool>[] tasks=new Task<bool>[nThreadsNum];

for (int i=0; i<1000; i++)
{
  string param1="test";
  string param2="test";

  if (nCurrentTask<10) // if there are less than 10 tasks then start another one
    tasks[nCurrentThread++] = Task.Factory.StartNew<bool>(() =>
    {
       MyClass cls = new MyClass();
       bool bRet = cls.Method1(param1, param2, i); // takes up to 2 minutes to finish
       return bRet;
    });

  // How can I stop the for loop until a new task is finished and start a new one?
}

【问题讨论】:

  • 您是否查看过WaitAny() 方法或尝试等待task.Result
  • 任务是IO绑定还是CPU绑定?

标签: c# multithreading task-parallel-library task threadpool


【解决方案1】:

答案取决于要调度的任务是受 CPU 限制还是 I/O 限制。

对于 CPU 密集型工作,我将使用 Parallel.For() API 通过 ParallelOptionsMaxDegreeOfParallelism 属性设置线程/任务数

对于 I/O 绑定的工作,并发执行任务的数量可能会明显大于可用 CPU 的数量,因此策略是尽可能依赖异步方法,从而减少等待完成的线程总数.

如何在新任务完成之前停止 for 循环并启动 新的?

可以使用 await 来限制循环:

    static void Main(string[] args)
    {
        var task = DoWorkAsync();
        task.Wait();

        // handle  results
        // task.Result;

        Console.WriteLine("Done.");
    }

    async static Task<bool> DoWorkAsync()
    {
        const int NUMBER_OF_SLOTS = 10;

        string param1="test";
        string param2="test";

        var results = new bool[NUMBER_OF_SLOTS]; 

        AsyncWorkScheduler ws = new AsyncWorkScheduler(NUMBER_OF_SLOTS);
        for (int i = 0; i < 1000; ++i)
        {
            await ws.ScheduleAsync((slotNumber) => DoWorkAsync(i, slotNumber, param1, param2, results));
        }

        ws.Complete();
        await ws.Completion;
    }

    async static Task DoWorkAsync(int index, int slotNumber, string param1, string param2, bool[] results)
    {
      results[slotNumber] = results[slotNumber} && await Task.Factory.StartNew<bool>(() =>
      {
          MyClass cls = new MyClass();
          bool bRet = cls.Method1(param1, param2, i); // takes up to 2 minutes to finish
          return bRet;
      }));

    }

帮助类 AsyncWorkScheduler 使用 TPL.DataFlow 组件以及 Task.WhenAll()

class AsyncWorkScheduler
{
    public AsyncWorkScheduler(int numberOfSlots)
    {
        m_slots = new Task[numberOfSlots];
        m_availableSlots = new BufferBlock<int>();
        m_errors = new List<Exception>();
        m_tcs = new TaskCompletionSource<bool>();
        m_completionPending = 0;

        // Initial state: all slots are available
        for(int i = 0; i < m_slots.Length; ++i)
        {
            m_slots[i] = Task.FromResult(false);
            m_availableSlots.Post(i);
        }
    }

    public async Task ScheduleAsync(Func<int, Task> action)
    {
        if (Volatile.Read(ref m_completionPending) != 0)
        {
            throw new InvalidOperationException("Unable to schedule new items.");
        }

        // Acquire a slot 
        int slotNumber = await m_availableSlots.ReceiveAsync().ConfigureAwait(false);

        // Schedule a new task for a given slot
        var task = action(slotNumber);

        // Store a continuation on the task to handle completion events
        m_slots[slotNumber] = task.ContinueWith(t => HandleCompletedTask(t, slotNumber), TaskContinuationOptions.ExecuteSynchronously);
    }


    public async void Complete()
    {
        if (Interlocked.CompareExchange(ref m_completionPending, 1, 0) != 0)
        {
            return;
        }

        // Signal the queue's completion
        m_availableSlots.Complete();

        await Task.WhenAll(m_slots).ConfigureAwait(false);

        // Set completion
        if (m_errors.Count != 0)
        {
            m_tcs.TrySetException(m_errors);
        }
        else
        {
            m_tcs.TrySetResult(true);
        }

    }

    public Task Completion
    {
        get
        {
            return m_tcs.Task;
        }
    }



    void SetFailed(Exception error)
    {
        lock(m_errors)
        {
            m_errors.Add(error);
        }

    }

    void HandleCompletedTask(Task task, int slotNumber)
    {
       if (task.IsFaulted || task.IsCanceled)
       {
           SetFailed(task.Exception);
           return;
       }

       if (Volatile.Read(ref m_completionPending) == 1)
       {
           return;
       }


        // Release a slot
        m_availableSlots.Post(slotNumber);
    }

    int m_completionPending;
    List<Exception> m_errors;
    BufferBlock<int> m_availableSlots;
    TaskCompletionSource<bool> m_tcs;
    Task[] m_slots;

}

【讨论】:

    【解决方案2】:

    为此,我将结合使用 Microsoft 的响应式框架(NuGet“Rx-Main”)和 TPL。它变得非常简单。

    代码如下:

    int nTotalTasks=10;
    string param1="test";
    string param2="test";
    
    IDisposable subscription =
        Observable
            .Range(0, 1000)
            .Select(i => Observable.FromAsync(() => Task.Factory.StartNew<bool>(() =>
            {
                MyClass cls = new MyClass();
                bool bRet = cls.Method1(param1, param2, i); // takes up to 2 minutes to finish
                return bRet;
            })))
            .Merge(nTotalTasks)
            .ToArray()
            .Subscribe((bool[] results) =>
            {
                /* Do something with the results. */
            });
    

    这里的关键部分是.Merge(nTotalTasks),它限制了并发任务的数量。

    如果您需要中途停止处理,只需致电subscription.Dispose(),一切都会为您清理干净。

    如果您想在生成每个结果时对其进行处理,您可以更改 .Merge(...) 中的代码,如下所示:

            .Merge(nTotalTasks)
            .Subscribe((bool result) =>
            {
                /* Do something with each result. */
            });
    

    【讨论】:

      【解决方案3】:

      你见过BlockingCollection 类吗?它允许您让多个线程并行运行,并且您可以等待一个任务的结果来执行另一个任务。查看更多信息here

      【讨论】:

      • 不,很有趣。
      【解决方案4】:

      查看Task.WaitAny 方法:

      等待任何提供的 Task 对象完成执行。

      文档中的示例:

      var t1 = Task.Factory.StartNew(() => DoOperation1());
      var t2 = Task.Factory.StartNew(() => DoOperation2());
      
      Task.WaitAny(t1, t2)
      

      【讨论】:

      • 我怎么知道哪个任务完成了?,因为我需要知道完成任务的返回值
      • @MarioM - 与没有WaitAny 调用的方式相同。 Task.Result
      • @MarioM WaitAny() 返回任务输入数组中的索引
      【解决方案5】:

      这应该是你所需要的,不是完整的,但你需要做的就是等待第一个完成,然后运行第二个。

      Task.WaitAny(task to wait on);
      
      Task.Factory.StartNew()
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-11-08
        • 2015-04-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多