【问题标题】:Notify task when other tasks complete其他任务完成时通知任务
【发布时间】:2016-11-22 19:09:08
【问题描述】:

.Net TPL 专家,

注意:不能使用DataFlow库;不允许加载项。

我有四个任务如下图所示:

  • task_1 (data_producer) -> 从大文件(>500000 条记录)中读取记录并将记录添加到 BlockingCollection

  • task_2, task_3 (data_consumers) -> 这些任务中的每一个都从 BlockingCollection 获取记录。每个任务对取自 BlockingCollection(网络相关)的记录执行一些工作,完成后,每个任务都可以将记录添加到结果队列中。处理顺序并不重要。

  • task_4(结果处理器)-> 从 results_queue 中获取记录并写入输出文件。

然后我等待任务完成,即:

Task.WhenAll( t1, t2, t3, t4 )

所以我有一个生产者任务、多个消费者任务和一个保存结果的任务。

我的问题是:

如何在任务 2 和 3 完成时通知任务 4,以便任务 4 也知道何时结束?

我发现了许多以线性“管道”方式将数据从一个任务“移动”到另一个任务的示例,但没有找到任何示例来说明上述情况;也就是任务2和3完成后如何通知任务4,让它也知道什么时候完成。

我最初的想法是在任务 4 中“注册”任务 2 和 3,并简单地监控每个注册任务的状态——当任务 2 和 3 不再运行时,任务 4 可以停止(如果结果队列是也是空的)。

提前致谢。

【问题讨论】:

  • 您无法为您的项目添加NuGet Package for TPL Dataflow
  • 正确 -- 对于这个特定项目,不允许使用 TPL 数据流。
  • TPL 数据流现在内置在 .NET 平台 (.NET Core) 中

标签: .net task-parallel-library tpl-dataflow


【解决方案1】:

这是对Thomas 已经说过的内容的一点扩展。

通过使用BlockingCollection,您可以在其上调用GetConsumingEnumerable(),并将其视为普通的foreach 循环。这将使您的任务“自然”结束。您唯一需要做的就是添加一个额外的任务来监视任务 2 和 3 以查看它们何时结束并调用完整的添加。

private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>();
private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>();

Task RunProcess()
{
    Task1Start();
    var t2 = Stage2Start();
    var t3 = Stage2Start();
    Stage2MonitorStart(t2,t3);
    retrun Task4Start();
}

public void Task1Start()
{
    Task.Run(()=>
    {
        foreach(var item in GetFileSource())
        {
            var processedItem = Process(item);
            _stageOneBlockingCollection.Add(processedItem);
        }
        _stageOneBlockingCollection.CompleteAdding();
    }
}

public Task Stage2Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage2(item);
            _stageTwoBlockingCollection.Add(processedItem);
        }
    }
}

void Stage2MonitorStart(params Task[] tasks)
{
    //Once all tasks complete mark the collection complete adding.
    Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding());
}

public Task Stage4Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage4(item);
            WriteToOutputFile(processedItem);
        }
    }
}

【讨论】:

  • 这就是确切的行为。所有任务同时运行,注意所有函数中的Task.Run。从RunProcess返回的Task是一个代表整个流程何时完成的任务。
  • 是的——看起来像你的赢家——非常感谢!!
【解决方案2】:

您所描述的任务很适合 TPL Dataflow libraryTPL 本身的小插件(它可以通过 nuget package 包含在项目中,.NET 4.5 支持),您只需轻松引入类似这样的流程(代码更新基于带有BroadcastBlock的cmets):

var buffer = new BroadcastBlock<string>();
var consumer1 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var consumer2 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var resultsProcessor = new ActionBlock<string>(s => { /* your logging logic here */ });

不确定您的解决方案逻辑,所以我认为您只是在这里操作字符串。你应该asynchronously send第一个块的所有传入数据(如果你Post你的数据,如果缓冲区过载,消息将被丢弃),并在彼此之间链接块,如下所示:

buffer.LinkTo(consumer1, new DataflowLinkOptions { PropagateCompletion = true });
buffer.LinkTo(consumer2, new DataflowLinkOptions { PropagateCompletion = true });
consumer1.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });
consumer2.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var s in IncomingData)
{
    await buffer.SendAsync(s);
}
buffer.Complete();

如果您的消费者应该同时处理所有项,那么您应该使用BroadcastBlock(可能会出现一些issues about the guaranteed delivery),其他选项是按消费者过滤您的消息(可能通过消息 id 按消费者数量的剩余部分),但在这种情况下,您应该链接到另一个消费者,该消费者将“捕获”由于某种原因未被消费的所有消息。

如您所见,块之间的链接是通过完整传播创建的,因此在此之后您可以简单地附加到.Completion 任务属性以获取resultsProcessor

resultsProcessor.Completion.ContinueWith(t => { /* Processing is complete */ });

【讨论】:

  • 请注意,BufferBlock 只会将商品提供给第一个消费者,这不是 OP 的意图。为了克服这个问题,您应该将 BufferBlock 链接到 TransmitBlock 并将 TransmitBlock 链接到每个消费者。
  • 还要注意应该等待 SendAsync。
  • @EyalPerry TransmitBlock - 你到底是什么意思?从来没有见过。你的意思是BroadCastBlock
  • 是的,那个..对不起伙计。漫长的一天:)
  • 如果我对 OP 的理解正确,那这正是他的本意。他确实说过两个消费者都收到了这些物品。
【解决方案3】:

如果您还对 results_queue 使用 BlockingCollection,那么您可以使用属性 BlockingCollection.IsCompleted 和 BlockingCollection.IsAddingCompleted 来实现这些通知。 过程是:

  • 当输入文件中没有更多记录时,task1 在输入集合上调用方法 BlockingCollection.CompleteAdding()。
  • task2 和 task3 在输入集合上定期检查属性 IsCompleted。当输入集合为空且生产者调用 CompleteAdding() 方法时,此属性为真。此属性为真后,任务 2 和 3 完成,他们可以在结果队列上调用 CompleteAdding() 方法并完成他们的工作。
  • task4 可以在result_queue 中的记录到达时对其进行处理,也可以等待结果队列的IsAddingCompleted 属性变为true,然后开始处理。当结果队列上的 IsCompleted 属性为 true 时,task4 的工作就完成了。

编辑: 我不确定您是否熟悉这些 IsCompleted 和 IsAddingCompleted 属性。它们是不同的,非常适合您的情况。我认为除了 BlockingCollection 属性之外,您不需要任何其他同步元素。请问是否需要补充说明!

    BlockingCollection<int> inputQueue;
    BlockingCollection<int> resultQueue;

    public void StartTasks()
    {
        inputQueue = new BlockingCollection<int>();
        resultQueue = new BlockingCollection<int>();

        Task task1 = Task.Run(() => Task1());
        Task task2 = Task.Run(() => Task2_3());
        Task task3 = Task.Run(() => Task2_3());
        Task[] tasksInTheMiddle = new Task[] { task2, task3 };
        Task waiting = Task.Run(() => Task.WhenAll(tasksInTheMiddle).ContinueWith(x => resultQueue.CompleteAdding()));
        Task task4 = Task.Run(() => Task4());

        //Waiting for tasks to finish
    }
    private void Task1()
    {
        while(true)
        {
            int? input = ReadFromInputFile();
            if (input != null)
            {
                inputQueue.Add((int)input);
            }
            else
            {
                inputQueue.CompleteAdding();
                break;
            }
        }
    }

    private void Task2_3()
    {
        while(inputQueue.IsCompleted)
        {
            int input = inputQueue.Take();
            resultQueue.Add(input);
        }
    }

    private void Task4()
    {
        while(resultQueue.IsCompleted)
        {
            int result = resultQueue.Take();
            WriteToOutputFile(result);
        }
    }

【讨论】:

  • 不清楚上述如何工作。即使到达输入文件的末尾,任务 2 和任务 3 仍可能将记录添加到结果队列中。我真正需要知道的是任务 2 和任务 3 何时完成(运行到完成)——所以我正在考虑监控这些任务的状态以确保所有结果都已完成。
  • 我现在已经阅读了 Scott Chamberlain 的评论。最好将我的解决方案与他的解决方案结合起来。如果您可以并行执行,那么等待 task2 和 3 的最终 task4 处理完成是没有意义的。但从他的解决方案来看,最好使用另一个任务 (Continue.WhenAll) 在结果队列上设置 CompleteAdding - 这样您就可以确定这些任务何时完成。在 task4 之间可以调用结果队列上的 Take() 方法并同时添加到输出文件(而 task2 和 3 仍在写入)。
  • 要在 Scott 的解决方案中完成相同的操作,只需将他的方法调用:Stage2MonitorStart(t2,t3);,这样它就不会阻止 Task4Start 的调用。当然,如果你的实现是你可以在中间处理任务完成之前写入输出文件。
猜你喜欢
  • 1970-01-01
  • 2015-04-09
  • 2013-05-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-11-08
  • 2012-02-06
  • 1970-01-01
相关资源
最近更新 更多