【问题标题】:TPL Dataflow async schedulingTPL 数据流异步调度
【发布时间】:2014-11-27 12:32:22
【问题描述】:

asyncTasks 的调度在 TPL 数据流中无法正常工作。在下面的示例中,我希望ActionBlock 在可用时立即处理来自TransformBlock 的数据。但它正在等待第二个(延迟的)结果,然后再进行第三个。我在这里误解了什么?对处理顺序有要求吗?

public class TestDataFlow
{
    public System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();

    public async Task Flow()
    {
        watch.Start();

        var plus10 = new TransformBlock<int, int>(async input =>
        {
            if (input == 2)
            {
                await Task.Delay(5000);
            }
            Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
            return input + 10;
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
        });

        var printSolution = new ActionBlock<int>(input =>
        {
            Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
        });

        plus10.LinkTo(printSolution);

        List<int> inputs = new List<int> { 1, 2, 3 };
        foreach (var input in inputs)
        {
            await plus10.SendAsync(input);
        }
    }
}

输出:

Exiting plus10 for input 1 @ 115.8583
Exiting plus10 for input 3 @ 116.6973
Solution: 11 @ 126.0146
Exiting plus10 for input 2 @ 5124.4074
Solution: 12 @ 5124.9014
Solution: 13 @ 5126.4834

【问题讨论】:

  • 看看这个我的answer关于让网格忽略保持秩序

标签: c# .net task-parallel-library async-await tpl-dataflow


【解决方案1】:

无论并行处理多少个项目,TPL Dataflow 都能保证输入和输出队列的顺序。

“因为每个预定义的源数据流块类型保证消息按照接收顺序传播出去,所以必须先从源块中读取每条消息,然后源块才能处理下一条消息”

来自Dataflow (Task Parallel Library)

如果您希望项目在完成处理后准确地移动到下一个块,您应该自己明确传输它们,这会将您的 TransformBlock 转换为 ActionBlock

var printSolution = new ActionBlock<int>(input =>
{
    Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},executionDataflowBlockOptions);

var plus10 = new ActionBlock<int>(async input =>
{
    if (input == 2)
    {
        await Task.Delay(5000);
    }
    Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
    await printSolution.SendAsync(input + 10);
}, executionDataflowBlockOptions);

【讨论】:

    【解决方案2】:

    从(至少)System.Threading.Tasks.Dataflow.4.6.0 开始,ExecutionDataflowBlockOptions 现在有一个属性EnsureOrdered,可以设置为false

    更新:

    Install-Package System.Threading.Tasks.Dataflow
    

    代码:

    var options = new ExecutionDataflowBlockOptions {
      EnsureOrdered = false
    };
    var transform = new TransformBlock<int, int>(i => Transform(i), options);
    

    更多示例:https://stackoverflow.com/a/38865414/625919

    发展史,我觉得还行:https://github.com/dotnet/corefx/issues/536https://github.com/dotnet/corefx/pull/5191

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-04-10
      • 2015-08-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多