【问题标题】:The unit of work problem in TPL Dataflow PipelineTPL Dataflow Pipeline 中的工作单元问题
【发布时间】:2020-08-19 01:19:40
【问题描述】:

我有一个经典的生产者消费者问题,其中多个用户可以同时将数据发布到 Web API 方法(api/test),这会触发 IO 密集型 long异步运行操作。我使用链接到BufferBlockActionBlock 将并发请求的数量限制为5。

Producer 类被注册为一个单例,目标是允许所有对 api/test 的调用进入这个队列。这意味着完成块之类的事情不是一种选择。

等待控制器完成我启动的工作的最有效方法是什么?

Web API 控制器:

[Route("api/test")]
[ApiController]
public class TestController : ControllerBase
{
    private Producer producer;

    public TestController(Producer producer)
    {
        this.producer = producer;
    }
    [HttpGet]
    public async Task<string[]> Values()
    {
        for (int i = 1; i <= 10; i++)
        {
            await this.producer.AddAsync(1);
        }

        // i've added my work to the queue, elegant completion required
        return new string[] { "value1", "value2" };
    }

}

生产者/消费者实现:

public class Producer
{
    private BufferBlock<int> queue;
    private ActionBlock<int> consumer;
    public List<int> results = new List<int>();

    private void InitializeChain()
    {
        queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, });
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
        consumer = new ActionBlock<int>(x =>
        {
            Thread.Sleep(5000);
            Debug.WriteLine(x + " " + Thread.CurrentThread.ManagedThreadId);
            results.Add(x);
        }, consumerOptions);
        queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
    }
    public async Task AddAsync(int data)
    {
        await queue.SendAsync(data);
    }
    public Producer()
    {
        this.InitializeChain();
    }
}

【问题讨论】:

  • 是否要将AddAsync(int data)的返回类型从Task更改为Task&lt;int&gt;,返回的任务表示处理data的异步完成,最终包含@987654332 @ 结果?如果是,在thisthis 问题中有一些不错的实现。顺便说一句,不需要中间 BufferBlockActionBlock 本身有一个内部输入缓冲区。

标签: c# async-await task-parallel-library tpl-dataflow


【解决方案1】:

因此,您可以使用多种方法和同步原语来解决这个问题,每种方法和同步原语都有自己的优势、容错性和问题,具体取决于您的需求。这是TaskCompletionSource

awaitable 示例

给定

public class Producer
{
   private BufferBlock<int> _queue;
   private ActionBlock<int> _consumer;
   public Action<int,string> OnResult;
   public Producer()
   {
      InitializeChain();
   }
   private void InitializeChain()
   {
      _queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
      var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };    
      _consumer = new ActionBlock<int>(SomeIoWorkAsync, consumerOptions);   
      _queue.LinkTo(_consumer, new DataflowLinkOptions { PropagateCompletion = true });
   }

   private async Task SomeIoWorkAsync(int x)
   {
      Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
      await Task.Delay(5000);
      OnResult?.Invoke(x,$"SomeResult {x}");
   }

   public Task AddAsync(int data) => _queue.SendAsync(data);
}

等待

您可以轻松地将其重构为在一次调用中执行发送和等待。

public static Task<string> WaitForConsumerAsync(Producer producer,int myId)
{
   var tcs = new TaskCompletionSource<string>();

   producer.OnResult += (id,result) =>
   {
      if(id == myId)
         tcs.TrySetResult(result);
   };

   return tcs.Task;
}

用法

var producer = new Producer();

// to simulate something you are waiting for, and id or what ever
var myId = 7;

// you could send and await in the same method if needed. this is just an example
var task = WaitForConsumerAsync(producer,myId);

// create random work for the bounded capacity to fill up
// run this as a task so we don't hit the back pressure before we await (just for this test)
Task.Run(async () =>
{
   for (int i = 1; i <= 20; i++)
      await producer.AddAsync(i);
});

// wait for your results to pop out
var result = await task;

Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result}, now i can finish happily");

// you can happily end here, the pipeline will keep going
Console.ReadKey();

输出

12:04:41.62464 : Processing 3
12:04:41.6246489 : Processing 1
12:04:41.6246682 : Processing 2
12:04:41.624641 : Processing 4
12:04:41.624661 : Processing 5
12:04:41.8530723 : Processing 7
12:04:41.8530791 : Processing 8
12:04:41.8531427 : Processing 10
12:04:41.8530716 : Processing 6
12:04:41.8530967 : Processing 9
12:04:42.0531947 : Got my result SomeResult 7, now i can finish happily
12:04:42.0532178 : Processing 11
12:04:42.0532453 : Processing 12
12:04:42.0532721 : Processing 14
12:04:42.0532533 : Processing 13
12:04:42.2674406 : Processing 15
12:04:42.2709914 : Processing 16
12:04:42.2713017 : Processing 18
12:04:42.2710417 : Processing 17
12:04:42.4689852 : Processing 19
12:04:42.4721405 : Processing 20

Full Demo Here

注意:您可能需要使用示例,以免超时

一次性完成的示例

public async Task<string> AddAsync(int data)
{
   await _queue.SendAsync(data);
   return await WaitForConsumerAsync(data);
}

public Task<string> WaitForConsumerAsync(int data)
{
   var tcs = new TaskCompletionSource<string>();

   OnResult += (id, result) =>
   {
      if (id == data)
         tcs.TrySetResult(result);
   };

   return tcs.Task;
}

补充说明

这实际上只是awaitable 事件的学术示例。我假设您的管道比给出的示例更复杂,并且您正在执行 CPU 和 IO 绑定工作负载的组合,此外,在此示例中您实际上需要 BufferBlock,这是多余的。

  1. 如果您正在等待纯 IO 工作负载,您可能最好只等待它们,不需要管道。
  2. 在您提供的信息中,没有真正需要使用BoundedCapacity 创建背压,除非您有某种内存限制。
  3. 您需要小心BoundedCapacity 和默认的EnsureOrdered = true。使用EnsureOrdered = false,管道将更加高效。作业完成后会弹出,背压不会受到不同结果排序的影响,这意味着项目可能会更快地通过管道
  4. 您还可以使用 RX 等其他框架,这可能会使这一切更加优雅和流畅
  5. 您还可以通过设置 SingleProducerConstrained = true 来提高效率,因为您的块是线性的

【讨论】:

  • @DavidCarter 背压仅在管道中有大量分配或多个块需要它的更复杂的设计时才有帮助
  • @DavidCarter 至于WaitForConsumerAsync,这整个结构依赖于您可以从您发送的数据中识别结果的事实。如果您有 ID,则可以使用 ID。或者如果身份保持不变,参考可能会很好
  • 我认为WaitForConsumerAsync 是泄漏的,因为它将处理程序附加到OnResult 操作,但处理程序始终保持连接状态。在某个地方应该有一个-=,以保持整洁。
  • @TheodorZoulias 动作处理程序将与作用域一样长。在等待任务之后,它就消失了。你可以自己测试一下。在某个阶段有一篇关于这个的 eric lippert 帖子,只是想把它挖掘出来
  • @DavidCarter 有限容量不会受到伤害,尤其是如果您想限制对操作的回调(这是我没有考虑的一件事)。至于您的情况的最佳选择。这取决于,因为这是在控制器后面,所以我怀疑这两种方式都存在问题,它的所有异步和没有线程在这部电影的标记中受到伤害
猜你喜欢
  • 1970-01-01
  • 2021-09-24
  • 2016-03-29
  • 2018-02-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-02-06
相关资源
最近更新 更多