【问题标题】:Schedule tasks in order按顺序安排任务
【发布时间】:2014-05-10 19:49:48
【问题描述】:

我的任务如下:

  1. 任务 taskInput 以获取数据到 inputQueue
  2. 任务 taskOutput 将数据从 inputQueue 抓取到 outputQueue
  3. 使用 outputQueue 中的数据的并行任务

我想运行符合条件的任务:

先运行 taskInput,再运行 taskOutput,最后运行 Consumer。

对应代码:

            // Get data to inputQueue
            Task taskInput = new Task(()=>AddingItemToInputQueue());
            taskInput.Start();  
            // Grab data from inputQueue to outputQueue.
            Task taskOutput = new Task(() => AddItemToOutputQueue());
            taskOutput.Start();

            // Parallel tasks for consume data from outputQueue
            int threadCount = n;
            Task[] workers = new Task[threadCount];
            for (int i = 0; i < threadCount; ++i)
            {
                Task task=Task.Run(()=>Consumer(i));
                workers[i] = task;
            }
            Task.WaitAll(workers);

关于inputQueue和outputQueue:

    BlockingCollection<Messages> InputQueue = new BlockingCollection<Messages>();
    BlockingCollection<Messages> OutputQueue = new BlockingCollection<Messages>();

我的问题:

  1. 安排任务。我以为我们可以使用 Task.ContinueWith Method bot 不确定如何将其应用于 Consumer
  2. 我不确定它是否是线程安全的,因为在运行 Consumer 时可能会将新项目添加到 inputQueue。

【问题讨论】:

  • Start后你用taskInputtaskOutput吗?
  • 我不确定你的意思,但消费者会使用 outputQueue 中的数据。 outputQueue 中的数据来自 taskOutut。
  • 我的意思是,如果您在启动 taskInputtaskOutput 对象后使用它们。或者你开始对这些对象不做任何事情
  • 我开始对这些对象什么都不做。
  • 这是C# 4.0 问题还是C# 5.0 问题?它看起来像一个C# 5.0 问题,因为您使用的是Task.Run。在这种情况下,不要使用Task 构造函数,只需使用Task.Run

标签: .net c#-4.0 scheduled-tasks c#-5.0


【解决方案1】:

async-await 可以这样写:

async Task DoStuffAsync()
{
    // Get data to inputQueue
    await Task.Run(()=>AddingItemToInputQueue());

    // Grab data from inputQueue to outputQueue.
    await Task.Run(() => AddItemToOutputQueue());

    // Parallel tasks for consume data from outputQueue
    int threadCount = n;
    Task[] workers = new Task[threadCount];
    for (int i = 0; i < threadCount; ++i)
    {
        Task task=Task.Run(()=>Consumer(i));
        workers[i] = task;
    }

    await Task.WhenAll(workers);
}

但是由于您需要将 AddingItemToInputQueueAddItemToOutputQueueTasks 包装起来以使它们异步,因此您只是在增加开销。 Consumer 也是如此。

你不妨这样做:

void DoStuff()
{
    // Get data to inputQueue
    AddingItemToInputQueue();

    // Grab data from inputQueue to outputQueue.
    AddItemToOutputQueue();

    // Parallel tasks for consume data from outputQueue
    int threadCount = n;
    Parallel.For(0, n, i => Consumer(i));
}

【讨论】:

  • --@Paulo:我不确定 Parallel 是否是线程安全的,在迭代 Consumer 时,可能会将新项目添加到 OutputQueue。会抛出异常吗?
  • 我不明白你的意思。需要线程安全的不是Parallel。这是Compute 或需要线程安全的队列。调用AddItemToOutputQueue后,项目不应该全部在队列中吗?
  • --@Paulo,我也有类似的海报,请查看@Servy 在stackoverflow.com/questions/22688679/…的评论
【解决方案2】:

您可以尝试启动第一个任务 (taskInput),完成后,继续执行第二个任务 (taskOutput),除非需要并行工作。在这种情况下,您必须分别启动两者,就像您已经在做的那样。

Task.Run(() => AddingItemToInputQueue())
            .ContinueWith(task => AddItemToOutputQueue());

同时,启动任务以消耗来自outputQueue的数据

// Parallel tasks for consume data from outputQueue
int threadCount = n;
Task[] workers = new Task[threadCount];
for (int i = 0; i < threadCount; ++i)
{
    Task task = Task.Run(() => Consumer(i));
    workers[i] = task;
}
Task.WaitAll(workers);

或者你可以试试这样的:

Task.Run(() => AddingItemToInputQueue())
    .ContinueWith(x => AddItemToOutputQueue())
    .ContinueWith(t =>
    {
        int threadCount = n;
        Task[] workers = new Task[threadCount];
        for (int i = 0; i < threadCount; ++i)
        {
            Task task = Task.Run(() => Consumer(i));
            workers[i] = task;
        }
        Task.WaitAll(workers);
    });

在这种情况下,它将按照您的意愿运行:首先是taskInput,然后是taskOutput,最后是Consumer

BlockingCollection 是线程安全的,因此您可以在多任务中添加和删除数据,它会在必要时自行管理阻塞。

您可以查看更多关于Task.Factory.StartNewTask.Runhere的信息

【讨论】:

  • 我们可以用 Task.Run 替换 Task.Factory.StartNew 吗?听说使用 Task.Factory.StartNew 很危险。
  • 是的,实际上这是最好的方法。你可以在这里查看更多信息:blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx
  • 注意:如果因为Task.WaitAll而从 UI 线程调用,这将阻塞 UI。在这种情况下,只需在 Task.WhenAll 上等待即可。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-03-29
  • 1970-01-01
  • 1970-01-01
  • 2011-08-03
  • 2017-04-19
相关资源
最近更新 更多