【问题标题】:Channel multiple producers and consumers引导多个生产者和消费者
【发布时间】:2021-11-14 03:18:45
【问题描述】:

我有以下代码:

var channel = Channel.CreateUnbounded<string>();

var consumers = Enumerable
    .Range(1, 5)   
    .Select(consumerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var item))
                {
                    Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
                }
            }
        }));

var producers = Enumerable
    .Range(1, 5)    
    .Select(producerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            for (var i = 0; i < 10; i++)
            {
                var t = $"Message {i}";
                Console.WriteLine($"Producing {t} on producer {producerNumber}");

                await channel.Writer.WriteAsync(t);
                await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
            }
        }));

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete());

await Task.WhenAll(consumers);

它应该可以正常工作,但我希望它在生产的同时消费。不过

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete());

阻止消费者运行直到它完成,我想不出让它们都运行的方法?

【问题讨论】:

  • 如果你使用一些 Channel 编码模式,你可以很多改进你的代码。问题不在于await Task.WhenAll,如果使用得当,ContinueWith 也没有问题。有的问题与抛出异常来模拟控制流虽然
  • 我发布了一个答案,展示了如何使用通道不仅可以简化代码,还可以使协调、组合和错误处理更容易很多
  • 更糟糕的是,如果你使用异常块来完成通道,consumer 将永远不会知道生产者扔了。在这种情况下,它可能无关紧要,但在生产代码中,这可能会导致例如数据库使用者即使在出现错误后也提交更改
  • 现在 .NET 6 即将推出,您可以通过Parallel.ForEachAsync 简化使用多个消费者的惯用方式,例如await Parallel.ForEachAsync(reader.ReadAllAsync(token),new (){MaxDegreeOfParallelism=3},item=&gt;Console.WriteLine(item))。您必须完全重写非惯用代码

标签: c# channel producer-consumer system.threading.channels


【解决方案1】:

代码存在一些问题,包括忘记枚举 producersconsumers 枚举值。 IEnumerable 是惰性求值的,所以在您实际使用 foreachToList 枚举它之前,不会生成任何内容。

如果使用得当,ContinueWith 也没有任何问题。这绝对比使用异常作为控制流更好也更便宜。

通过使用一些常见的 Channel 编码模式,可以很多改进代码。

  1. 生产者拥有并封装通道
  2. 生产者只公开读者

另外,ContinueWith 是一个极好的选择来表示 ChannelWriter 的完成,因为我们根本不关心哪个线程会这样做。如果有的话,我们更愿意使用“工作”线程之一来避免线程切换。

假设生产者函数是:

async Task Produce(ChannelWriter<string> writer, int producerNumber)
{
    return Task.Run(async () =>
    {
        var rnd = new Random();
        for (var i = 0; i < 10; i++)
        {
            var t = $"Message {i}";
            Console.WriteLine($"Producing {t} on producer {producerNumber}");

            await channel.Writer.WriteAsync(t);
            await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
        }
    }
}

制片人

生产者可以是:

ChannelReader<string> ProduceData(int dop)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;

    var tasks=Enumerable.Range(0,dop)
                 .Select(producerNumber => Produce(producerNumber))
                 .ToList();
    _ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
                       .
    
    return channel.Reader;
}

完成和错误传播

注意这一行:

_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));

这表示一旦生产者完成,编写者本身就应该完成,但可能会引发任何异常。延续运行在哪个线程上并不重要,因为它除了调用 TryComplete 之外什么都不做。

更重要的是,t=&gt;writer.TryComplete(t.Exception) 将工作异常传播给下游消费者。否则消费者永远不会知道出了什么问题。如果您有一个数据库使用者,您希望它避免在源中止时完成任何更改。

消费者

消费者方法可以是:

async Task Consume(ChannelReader<string> reader,int dop,CancellationToken token=default)
{
    var tasks= Enumerable
        .Range(1, dop)   
        .Select(consumerNumber =>
            Task.Run(async () =>
            {
                await foreach(var item in reader.ReadAllAsync(token))
                {
                    Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
                }
            }));
    await Task.WhenAll(tasks);
}

在这种情况下,await Task.WhenAll(tasks); 枚举工作任务,从而启动它们。

生成所有生成的消息不需要其他任何东西。当所有生产者都完成后,Channel.Reader 就完成了。发生这种情况时,ReadAllAsync 将继续向消费者提供所有剩余的消息并退出。

作曲

结合这两种方法很简单:

var reader=Produce(10);
await Consume(reader);

一般模式

这是使用 Channels 的流水线阶段的一般模式 - 从 ChannelReader 读取输入,将其写入内部 Channel 并仅返回拥有的通道的 Reader。这样,舞台就拥有了通道,这使得完成和错误处理很多变得更容易:

static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,int dop=1,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<TOut>();
    var writer=channel.Writer;

    var tasks=Enumerable.Range(0,dop)
              .Select(async i=>Task.Run(async ()=>
              {
                  await(var item in reader.ReadAllAsync(token))
                  {
                      try
                      {
                          ...
                          await writer.WriteAsync(msg);
                      }
                      catch(Exception exc)
                      {
                          //Handle the exception and keep processing messages
                      }
                  }
              },token));
    _ =Task.WhenAll(tasks)
           .ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}

这允许将多个“阶段”链接在一起以形成管道:

var finalReader=Producer(...)
                .Crunch1()
                .Crunch2(10)
                .Crunch3();
await foreach(var result in finalReader.ReadAllAsync())
{
...
}

生产者和消费者方法可以用相同的方式编写,例如允许创建数据导入管道:

var importTask = ReadFiles<string>(somePath)
                  .ParseCsv<string,Record[]>(10)
                  .ImportToDb<Record>(connectionString);

await importTask;

ReadFiles

static ChannelReader<string> ReadFiles(string folder)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;

    var task=Task.Run(async ()=>{
        foreach(var path in Directory.EnumerateFiles(folder,"*.csv"))
        {
            await writer.WriteAsync(path);
        }
    });
    task.ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}

.NET 6 Parallel.ForEachAsync 更新

现在生产支持 .NET 6,可以使用 Parallel.ForEachAsync 将并发使用者简化为:

static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,
                           int dop=1,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<TOut>();
    var writer=channel.Writer;

    var dop=new ParallelOptions { 
                MaxDegreeOfParallelism = dop,
                CancellationToken = token
    };
    var task=Parallel.ForEachAsync(
                 reader.ReadAllAsync(token),
                 dop,
                 async item =>{
                      try
                      {
                          ...
                          await writer.WriteAsync(msg);
                      }
                      catch(Exception exc)
                      {
                          //Handle the exception and keep processing messages
                      }
                  });
    task.ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}

【讨论】:

    【解决方案2】:

    consumersproducers 变量的类型为 IEnumerable&lt;Task&gt;。这是一个deferred 可枚举的,需要具体化才能创建任务。您可以通过在 LINQ 查询上链接 ToArray 运算符来实现可枚举。这样,这两个变量的类型就会变成Task[],这意味着你的任务已经实例化并启动并运行了。

    附带说明,ContinueWith 方法需要显式传递 TaskScheduler.Default 作为参数,否则您将受制于 TaskScheduler.Current 可能是什么(例如,它可能是 UI TaskScheduler) .这是ContinueWith的正确用法:

    await Task.WhenAll(producers)
        .ContinueWith(_ => channel.Writer.Complete(), TaskScheduler.Default);
    
    1. 代码分析器CA2008:Do not create tasks without passing a TaskScheduler
    2. “[...]这就是为什么在我编写的生产库代码中,我总是明确指定要使用的调度程序。” (Stephen Toub)

    另一个问题是producers 抛出的任何异常都会被吞掉,因为任务没有等待。只等待继续,这不太可能失败。要解决此问题,您可以放弃 primitive ContinueWith,而使用 async-await 组合(等待 producers 然后完成通道的异步 local function)。在这种情况下,甚至没有必要这样做。你可以这样做:

    try { await Task.WhenAll(producers); }
    finally { channel.Writer.Complete(); }
    

    频道将在Task.WhenAll(producers)任务的任何结果之后Complete,因此consumers不会卡住。

    第三个问题是某些producers 的失败将导致当前方法在等待consumers 之前立即终止。然后,这些任务将成为即发即弃的任务。我将其留给您,以了解如何确保在所有情况下都可以等待所有任务,然后才能成功退出方法或出现错误。

    【讨论】:

    • 非常感谢您的出色回应!
    • @Houlahan 不完全是。在这种情况下,ContinueWith 没有任何问题。你根本不在乎用什么线程来调用writer.Complete()@。另一方面,像这样使用异常作为不完整的控制流是一个问题:它代价高昂,更难调试,但更糟糕的是,工作异常永远不会发送到下游通道,而下游通道永远不会意识到生产者已中止。
    猜你喜欢
    • 1970-01-01
    • 2015-03-27
    • 2011-11-08
    • 1970-01-01
    • 2016-09-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多