【问题标题】:Implementation issue with TransformBlock using TPL Dataflow使用 TPL 数据流的 TransformBlock 的实施问题
【发布时间】:2016-03-28 03:08:38
【问题描述】:

我正在学习 TPL 数据流。我尝试创建一个示例,在其中发布来自不同Tasks 的一些值,并期望将结果返回到相同的Task 中以进一步处理它。但结果是错误的。以下是我的代码。让我知道我做错了什么以及如何解决它。

static void Main(string[] args)
{

    var transBlock = new TransformBlock<int, int>
       (
           n =>
           {
               Thread.Sleep(1000);

               return (n*2);
           }
       );

    new Task(() => 
    {

       var result = transBlock.Post(2);
       var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 2 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(3);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 3 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(4);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 4 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(5);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 5 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(6);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 6 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(7);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 7 is {0}", val));

    }).Start();

    Console.ReadLine();
}

每次结果都不一样,但一旦出现如下:

double for 5 is 8
double for 4 is 6
double for 3 is 4
double for 2 is 10
double for 6 is 12
double for 7 is 14

【问题讨论】:

    标签: c# task-parallel-library reactive-programming tpl-dataflow dataflow


    【解决方案1】:

    这不是 TPL 数据流的工作方式。

    TPL Dataflow 是一个参与者框架。您创建一个块,告诉它要做什么,将项目发布到其中,它一个接一个地执行每个项目的操作(可能同时),然后输出结果。如果您有多个块,则可以将它们链接在一起并形成管道。

    区块不知道是谁在其中发布了哪个项目。没有理由期望结果会返回到匹配任务。

    如果您想跟踪输入和输出,可以一起返回输入和输出的元组:

    var transBlock = new TransformBlock<int, Tuple<int,int>>(async n =>
    {
        await Task.Delay(1000)
        return Tuple.Create(n, n * 2);
    });
    
    
    var tuple = transBlock.Receive();
    Console.WriteLine(string.Format("double for {0} is {1}", tuple.Item1, tuple.Item2));
    

    【讨论】:

    • 基本上我的目标是以线程安全的方式使用数据流创建读写文件 IO 操作。我可以使用 ConcurrentExclusiveSchedulerPair 而不是读写锁,这将使我免受以一种好的方式使用锁的副作用。如果可能,请提出建议。
    • @BalrajSingh 您可以同时用于读取和写入操作。但是你永远不能保证插入项目的任务就是阅读它的任务。如果这是您的目标,您可能需要多个数据结构。
    • 多数据结构是什么意思。能否请您详细说明。
    • @BalrajSingh 例如每个任务的块。或并发队列。
    • 我确实做到了。它工作得很好。但是在发布数据之后,块的执行会在一些暂停之后发生。我知道这是因为 TransformBlock 期望获得更多数据。为此,我还在发布后添加了 TransformBlock.Complete() 行,但问题仍然存在。你能告诉我我应该怎么做才能立即执行。
    【解决方案2】:

    我不知道TPL Dataflow 是否是FIFO,但即使它,你的代码也有竞争条件。

    只考虑这两个:

    new Task(() => 
    {
    
       var result = transBlock.Post(2);
       var val = transBlock.Receive();
    
        Console.WriteLine(string.Format("double for 2 is {0}", val));
    }).Start();
    
    new Task(() =>
    {
    
        var result = transBlock.Post(3);
        var val = transBlock.Receive();
    
        Console.WriteLine(string.Format("double for 3 is {0}", val));
    }).Start();
    

    这些任务可能会或可能不会在单独的线程上执行。但是,如果他们这样做,则第二个任务可以发布3,然后将上下文传递给第一个任务,后者发布2 并接收3

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-11-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-06
      • 2018-08-29
      相关资源
      最近更新 更多