【问题标题】:TPL Dataflow, guarantee completion only when ALL source data blocks completedTPL Dataflow,仅在所有源数据块完成时才保证完成
【发布时间】:2012-11-22 10:00:57
【问题描述】:

当两个转换块都完成时,如何重写代码完成的代码?我认为完成意味着它被标记为完成并且“out queue”是空的?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

我编辑了代码,为每个转换块添加了输入缓冲区计数。很明显,所有 100 个项目都流式传输到每个转换块。但是一旦其中一个变换块完成,处理器块就不再接受任何项目,而是不完整的变换块的输入缓冲区只是刷新输入缓冲区。

【问题讨论】:

  • 请注意,转换块可能不会收到来自广播块的所有消息。他们只收到最新的消息。如果向广播块提供消息的速度快于转换块接收它们的速度,则转换块将丢失消息。另外,如果您想确保消息顺序等,您应该在SendAsync(i)await
  • @urbanhusky 我不确定你关于TransformBlock 会错过消息的断言。文档声明BroadcastBlock 保证它会在接受新项目之前传播到所有链接的目标。如果BoundedCapacity 在目标上不受限制,那么目标TransformBlock 将缓冲消息。现在,如果设置了BoundedCapacity,那么它将阻塞直到有容量为止,这可能会导致消息丢失,尤其是对于未等待的SendAsync。简而言之,我的理解是,如果不受限制,则不会丢弃任何消息。

标签: c# concurrency task-parallel-library tpl-dataflow


【解决方案1】:

问题正是 casperOne 在他的回答中所说的。第一个转换块完成后,处理器块进入“完成模式”:它将处理其输入队列中的剩余项目,但不会接受任何新项目。

虽然有一个比将处理器块一分为二更简单的解决方法:不要设置 PropagateCompletion,而是在两个转换块都完成时手动设置处理器块的完成:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());

【讨论】:

  • 正是我想要的。不知道 Task.WhenAll 返回一个等待的任务,我的疏忽。
  • 我也需要同样的东西,也许为时已晚,但你能发布一个关于我需要在哪里添加 Task.WhenAll 构造的更新吗?
  • @AttilaHajdrik 可能在您的数据流设置代码的末尾,靠近您的LinkTos。
  • 我已经尝试过了,但没有工作,但现在是凌晨 1:30...虽然我已经将代码修改为纯 TPL 并分离了一些任务,我想我能够解决它没有 DataFlow。
【解决方案2】:

这里的问题是,您每次调用 LinkTo method 时都在设置 PropagateCompletion property 以链接块以及转换块中不同的等待时间。

来自IDataflowBlock interfaceComplete method 的文档(重点是我的):

向 IDataflowBlock 发出信号,表明它不应该接受也不应该产生任何更多的消息也不应该消耗任何更多的延迟消息

因为您在每个 TransformBlock&lt;TInput, TOutput&gt; 实例中错开等待时间,所以 transformBlock2(等待 20 毫秒)在 transformBlock1(等待 50 毫秒)之前完成。 transformBlock2 首先完成,然后将信号发送到 processorBlock,然后它会说“我不接受其他任何东西”(而 transformBlock1 还没有产生所有的消息)。

请注意,transformBlock1transformBlock1 之前的处理不能绝对保证;线程池(假设您使用默认调度程序)以不同的顺序处理任务是可行的(但很可能不会,因为一旦完成 20 毫秒的项目,它将从队列中窃取工作)。

您的管道如下所示:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          \              /
           processorBlock

为了解决这个问题,您需要一个如下所示的管道:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          |              |
 processorBlock1   processorBlock2

只需创建两个单独的 ActionBlock&lt;TInput&gt; 实例即可完成,如下所示:

// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);

// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);


// Linking
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });

然后您需要在两个处理器块上等待,而不仅仅是一个:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();

这里有一个非常重要的注意事项;创建ActionBlock&lt;TInput&gt; 时,默认设置是将传递给它的ExecutionDataflowBlockOptions 实例上的MaxDegreeOfParallelism property 设置为1。

这意味着您传递给ActionBlock&lt;TInput&gt; 的对Action&lt;T&gt; delegate 的调用是线程安全的,一次只会执行一个。

因为您现在有 两个 ActionBlock&lt;TInput&gt; 实例指向同一个 Action&lt;T&gt; 委托,因此无法保证线程安全。

如果您的方法是线程安全的,那么您无需执行任何操作(这将允许您将 MaxDegreeOfParallelism 属性设置为 DataflowBlockOptions.Unbounded,因为没有理由阻止)。

如果它不是线程安全的,并且你需要保证它,你需要求助于传统的同步原语,比如lock statement

在这种情况下,您可以这样做(尽管显然不需要,因为 Console class 上的 WriteLine method 是线程安全的):

// The lock.
var l = new object();

// The action, can be a method, makes it easier to share.
Action<string> a = i => {
    // Ensure one call at a time.
    lock (l) Console.WriteLine(i);
};

// And so on...

【讨论】:

  • 感谢冗长的回答,但我选择了 svick 的回答,因为它直接适用于 TPL Dataflow 并提供了一个非常简洁明了的解决方案。
  • 如果您对两个操作块使用相同的ExclusiveScheduler,您可以轻松避免锁定。
【解决方案3】:

对 svick 的回答的补充:为了与使用 PropagateCompletion 选项获得的行为一致,您还需要转发异常以防前一个块出现故障。像下面这样的扩展方法也可以解决这个问题:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
    if (target == null) return;
    if (sources.Length == 0) { target.Complete(); return; }
    Task.Factory.ContinueWhenAll(
        sources.Select(b => b.Completion).ToArray(),
        tasks => {
            var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
            if (exceptions.Count != 0) {
                target.Fault(new AggregateException(exceptions));
            } else {
                target.Complete();
            }
        }
    );
}

【讨论】:

    【解决方案4】:

    其他答案很清楚为什么当一个块有两个以上来源时,为什么 PropagateCompletion=true 会搞砸。

    为了提供一个简单的问题解决方案,您可能需要查看一个开源库DataflowEx,它通过内置更智能的完成规则来解决此类问题。 (它在内部使用 TPL 数据流链接,但支持复杂的完成传播。实现看起来类似于WhenAll,但也处理动态链接添加。请查看Dataflow.RegisterDependency()TaskEx.AwaitableWhenAll() 了解详细信息。)

    我稍微更改了您的代码,以使用 DataflowEx 使一切正常:

    public CompletionDemo1()
    {
        broadCaster = new BroadcastBlock<int>(
            i =>
                {
                    return i;
                }).ToDataflow();
    
        transformBlock1 = new TransformBlock<int, string>(
            i =>
                {
                    Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                    Thread.Sleep(50);
                    return ("1_" + i);
                });
    
        transformBlock2 = new TransformBlock<int, string>(
            i =>
                {
                    Console.WriteLine("2 input count: " + transformBlock2.InputCount);
                    Thread.Sleep(20);
                    return ("2_" + i);
                });
    
        processor = new ActionBlock<string>(
            i =>
                {
                    Console.WriteLine(i);
                }).ToDataflow();
    
        /** rather than TPL linking
          broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
          broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
          transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
          transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
         **/
    
        //Use DataflowEx linking
        var transform1 = transformBlock1.ToDataflow();
        var transform2 = transformBlock2.ToDataflow();
    
        broadCaster.LinkTo(transform1);
        broadCaster.LinkTo(transform2);
        transform1.LinkTo(processor);
        transform2.LinkTo(processor);
    }
    

    完整代码是here

    免责声明:我是 DataflowEx 的作者,它是在 MIT 许可下发布的。

    【讨论】:

    • 如果您为国双工作,能否透露一下?我的问题明确提到我需要 TPL Dataflow 的答案,我不想使用第三方解决方案来解决这个问题。谢谢。
    • 是的,我在国双工作。但是该库是完全免费和开源的,所以我认为它可能会对您有所帮助。完全没有商业思维。如果您需要的是关于 TPL Dataflow 的内部机制,请忽略我的回答。但是,如果有人需要解决方案,那么答案就有价值。谢谢:)
    • 更详细地更新了答案。还添加了免责声明。
    【解决方案5】:

    这里有个方法,功能上等同于pkt的CompleteWhenAll方法,但代码略少:

    public static async void PropagateCompletion(IDataflowBlock[] sources,
        IDataflowBlock target)
    {
        // Arguments validation omitted
        Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));
    
        try { await allSourcesCompletion.ConfigureAwait(false); } catch { }
    
        var exception = allSourcesCompletion.IsFaulted ?
            allSourcesCompletion.Exception : null;
    
        if (exception != null) target.Fault(exception); else target.Complete();
    }
    

    使用示例:

    PropagateCompletion(new[] { transformBlock1, transformBlock2 }, processorBlock);
    

    PropagateCompletion 方法是同名的更通用方法的变体,我已经发布了here

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-09-05
      • 1970-01-01
      相关资源
      最近更新 更多