【问题标题】:Parallel.ForEach MaxDegreeOfParallelism Strange Behavior with Increasing "Chunking"Parallel.ForEach MaxDegreeOfParallelism 增加“分块”的奇怪行为
【发布时间】:2021-08-21 20:14:05
【问题描述】:

我不确定这个标题是否有意义,这是我能想到的最好的,所以这是我的场景。

我有一个 ASP.NET Core 应用程序,我更多地将其用作外壳并用于 DI 配置。在Startup 中,它添加了一堆IHostedServices 作为单例,以及它们的依赖项,也作为单例,除了SqlConnectionDbContext 的小例外,我们稍后会介绍。托管服务是一组类似的服务:

  1. 侦听来自 GPS 设备的传入报告并将其放入侦听缓冲区。
  2. 从侦听缓冲区中解析项目并放入已解析的缓冲区中。

最终会有一个服务读取已解析的缓冲区并实际处理已解析的报告。它通过将它从缓冲区中取出的报告传递给处理程序并等待它完成以移动到下一个来完成此操作。这在过去一年中运行良好,但似乎我们现在遇到了可扩展性问题,因为它一次处理一个报告,服务器上的平均处理时间为 62 毫秒,其中包括 Dapper 访问数据库以获取所需的数据以及保存更改的 EF Core 之旅。

但是,如果处理程序决定报告的信息需要触发后台作业,那么我怀疑它需要 100 毫秒或更长时间才能完成。随着时间的推移,缓冲区填满的速度超过了处理程序可以处理的速度,直到可以处理数千个报告为止,即使不是 100 也可以是 10 秒。这是一个问题,因为通知会延迟,并且如果在服务器在午夜重新启动时缓冲区仍然已满,则可能会丢失数据。

话虽如此,我正试图弄清楚如何使处理并行化。经过昨天的大量实验,我决定在缓冲区上使用Parallel.ForEach,使用GetConsumingEnumerable()。这很好用,除了我不知道该怎么做甚至打电话的奇怪行为。随着缓冲区被填满并且ForEach 正在对其进行迭代,它将开始将处理“分块”为不断增加的二的倍数。分块的大小受MaxDegreeOfParallelism 设置的影响。例如(N# = 缓冲区中的下一个报告数):

MDP = 1

  • N3 = 1 次
  • N6 = 2 一次
  • N12 = 4 次
  • ...

MDP = 2

  • N6 = 1 次
  • N12 = 2 次​​li>
  • N24 = 4 次
  • ...

MDP = 4

  • N12 = 1 次
  • N24 = 2 次​​li>
  • N48 = 4 次
  • ...

MDP = 8(我的 CPU 核心数)

  • N24 = 1 次
  • N48 = 2 次​​li>
  • N96 = 一次 4 个
  • ...

这可以说比我现在的串行执行更糟糕,因为到一天结束时,它会在实际处理它们之前缓冲并等待,例如,50 万个报告。

有没有办法解决这个问题?我对Parallel.ForEach 不是很有经验,所以从我的角度来看,这是一种奇怪的行为。最终,我正在寻找一种方法来并行处理报告,只要它们在缓冲区中,所以如果有其他方法可以实现这一点,我会全力以赴。这大致就是我的代码。处理报告的处理程序确实使用IServiceProvider 创建范围并获取SqlConnectionDbContext 的实例。提前感谢您的任何建议!

public sealed class GpsReportService :
    IHostedService {
    private readonly GpsReportBuffer _buffer;
    private readonly Config _config;
    private readonly GpsReportHandler _handler;
    private readonly ILogger _logger;

    public GpsReportService(
        GpsReportBuffer buffer,
        Config config,
        GpsReportHandler handler,
        ILogger<GpsReportService> logger) {
        _buffer = buffer;
        _config = config;
        _handler = handler;
        _logger = logger;
    }

    public Task StartAsync(
        CancellationToken cancellationToken) {
        _logger.LogInformation("GPS Report Service => starting");

        Task.Run(Process, cancellationToken).ConfigureAwait(false);//   Is ConfigureAwait here correct usage?

        _logger.LogInformation("GPS Report Service => started");

        return Task.CompletedTask;
    }

    public Task StopAsync(
        CancellationToken cancellationToken) {
        _logger.LogInformation("GPS Parsing Service => stopping");

        _buffer.CompleteAdding();

        _logger.LogInformation("GPS Parsing Service => stopped");

        return Task.CompletedTask;
    }

    //  ========================================================================
    //  Utilities
    //  ========================================================================

    private void Process() {
        var options = new ParallelOptions {
            MaxDegreeOfParallelism = 8,
            CancellationToken = CancellationToken.None
        };

        Parallel.ForEach(_buffer.GetConsumingEnumerable(), options, async report => {
            try {
                await _handler.ProcessAsync(report).ConfigureAwait(false);
            } catch (Exception e) {
                if (_config.IsDevelopment) {
                    throw;
                }

                _logger.LogError(e, "GPS Report Service");
            }
        });
    }

    private async Task ProcessAsync() {
        while (!_buffer.IsCompleted) {
            try {
                var took = _buffer.TryTake(out var report, 10);

                if (!took) {
                    continue;
                }

                await _handler.ProcessAsync(report!).ConfigureAwait(false);
            } catch (Exception e) {
                if (_config.IsDevelopment) {
                    throw;
                }

                _logger.LogError(e, "GPS Report Service");
            }
        }
    }
}

public sealed class GpsReportBuffer :
    BlockingCollection<GpsReport> {
}

【问题讨论】:

  • 您似乎混淆了任务和线程,这绝不是一个好主意。
  • Parallel.ForEach 用于数据并行,而不是异步操作。它旨在通过对内存数据进行分区并在每个 CPU 内核上使用大约一个工作任务来处理大块内存数据,以减少内核之间的同步。它甚至会使用当前线程,这会导致错误地假设它正在阻塞。当所有核心都忙时,为什么使用线程?无论如何它都无法运行。它不是用于处理消息流、异步操作或启动其他线程。使用 Task.Run 会适得其反 - Parallel.ForEach 无论如何都会使用所有内核
  • 如果您使用正确的类 - 数据流块或通道而不是 BlockingCollection 和 Parallel.Foreach,您可以极大地简化问题
  • 您如何处理 GPS 数据? Reactive Extensions 允许对事件流进行处理、聚合和分析。您可以使用它来计算每个“事物”在特定时间窗口中的行进距离,使用类似 LINQ 的操作,例如 events.Buffer(TimeSpan.FromMinutes(1)).Select(list=&gt;list.Average(gps=&gt;gps.Speed));

标签: c# asp.net-core task-parallel-library parallel.foreach asp.net-core-5.0


【解决方案1】:

您不能将 Parallel 方法与 async 委托一起使用 - 至少现在还不行。

由于您已经拥有“管道”风格的架构,我建议您研究 TPL 数据流。一个ActionBlock 可能就是您所需要的全部,一旦您开始工作,TPL Dataflow 中的其他块可能会替换您管道的其他部分。

如果您更喜欢使用现有缓冲区,那么您应该使用异步并发而不是Parallel

private void Process() {
  var throttler = new SemaphoreSlim(8);
  var tasks = _buffer.GetConsumingEnumerable()
      .Select(async report =>
      {
        await throttler.WaitAsync();
        try {
          await _handler.ProcessAsync(report).ConfigureAwait(false);
        } catch (Exception e) {
          if (_config.IsDevelopment) {
            throw;
          }

          _logger.LogError(e, "GPS Report Service");
        }
        finally {
          throttler.Release();
        }
      })
      .ToList();
  await Task.WhenAll(tasks);
}

【讨论】:

  • 我确实尝试过仅使用 _handler.ProcessAsync(report) 的非异步委托,但结果相同。我从来没有使用过信号量,所以我有一个问题……你在缓冲区上做了一个 ToList,但是如果缓冲区被清空了会发生什么?执行是停止还是等待新项目?我想我只需要试试看,Parallel.ForEach 会继续运行,所以我认为这也会。
  • ProcessAsync 仍然是异步委托类型,因为它返回一个任务。 GetConsumingEnumerable 将在缓冲区为空并标记为完成时完成。
  • 除非我弄错了,否则var tasks = _buffer.GetConsumingEnumerable().Select(/*...*/).ToList(); 语句将阻塞直到_buffer 完成。因此,在等待最后几个未完成任务完成时阻塞可能更有意义:Task.WaitAll(tasks); 而不是 await Task.WhenAll(tasks);
  • 如果使用Channel而不是BlockingCollection,问题会变得简单很多
  • 我已经实现了 SemaphorSlim 示例,它现在似乎运行良好,或者至少在我可以考虑用 Dataflow 或 Channels 实现替换它之前,它一直在运行。感谢您的帮助!
【解决方案2】:

您遇到了事件流处理/数据流问题,而不是并行问题。如果您使用适当的类,例如 Dataflow 块、ChannelsReactive Extensions,则问题被简化了很多

即使您想使用单个缓冲区和胖工作者方法,适当的缓冲区类是异步Channel,而不是 BlockingCollection。代码可以变得如此简单:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await foreach(GpsMessage msg in _reader.ReadAllAsync(stopppingToken))
    {
       await _handler.ProcessAsync(msg);
    }
}

第一个选项显示如何使用数据流创建管道。二、如何使用Channel代替BlockingCollection同时处理多个排队项

使用 Dataflow 的管道

一旦您将流程分解为独立的方法,就可以轻松地使用任何库创建处理步骤的管道。

Task<IEnumerable<GpsMessage>> Poller(DateTime time,IList<Device> devices,CancellationToken token=default)
{
    foreach(var device in devices)
    {
        if(token.IsCancellationRequested)
        {
            break;
        }
        var msg=await device.ReadMessage();
        yield return msg;
    }
}

GpsReport Parser(GpsMessage msg)
{
    //Do some parsing magic. 
    return report;
}

async Task<GpsReport> Enrich(GpsReport report,string connectionString,CancellationToken token=default)
{
    //Depend on connection pooling to eliminate the cost of connections
    //We may have to use a pool of opened connections otherwise
    using var con=new SqlConnection(connectionString);
    var extraData=await con.QueryAsync<Extra>(sql,new {deviceId=report.DeviceId},token);
    report.Extra=extraData;
    return report;
}

async Task BulkImport(SqlReport[] reports,CancellationToken token=default)
{
    using var bcp=new SqlBulkCopy(...);
    using var reader=ObjectReader.Create(reports);
    ...
    await bcp.WriteToServerAsync(reader,token);
}

在 BulkImport 方法中,我使用 FasMember's ObjectReader 在报告上创建 IDataReader 包装器,以便可以将它们与 SqlBulkCopy 一起使用。另一种选择是将它们转换为 DataTable,但这会在内存中创建数据的额外副本。

将所有这些与 Dataflow 结合起来相对容易。

var execOptions=new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10
}

_poller      = new TransformManyBlock<DateTime,GpsBuffer>(time=>Poller(time,devices));
_parser      = new TransformBlock<GpsBuffer,GpsReport>(b=>Parser(b),execOptions);
var enricher = new TransformBlock<GpsReport,GpsReport>(rpt=>Enrich(rpt,connStr),execOptions);
_batch       = new BatchBlock<GpsReport>(50);
_bcpBlock    = new ActionBlock<GpsReport[]>(reports=>BulkImport(reports));

每个块都有一个输入和输出缓冲区(ActionBlock 除外)。每个块负责处理其输入缓冲区中的消息并对其进行处理。默认情况下,每个块仅使用一个工作任务,但可以更改。消息顺序保持不变,因此如果我们为 parser 块使用例如 10 个工作任务,消息仍将按照接收顺序发出。

接下来是链接块。

var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};

_poller.LinkTo(_parser,options);
_parser.LinkTo(_enricher,options);
_enricher.LinkTo(_batch,options);
_batch.LinkTo(_bcpBlock,options);

之后,我们可以随时使用计时器“ping”头块,即轮询器:


private void Ping(object state)
{
    _poller.Post(DateTime.Now);
}

public Task StartAsync(CancellationToken stoppingToken)
{
    _logger.LogInformation("Timed Hosted Service running.");

    _timer = new Timer(Ping, null, TimeSpan.Zero, 
        TimeSpan.FromSeconds(5));

    return Task.CompletedTask;
}

为了优雅地停止管道,我们在 head 块上调用 Complete() 并在最后一个块上等待 Completion 任务。假设托管服务类似于timed background service example

public Task StopAsync(CancellationToken cancellationToken) 
{

    ....
    _timer?.Change(Timeout.Infinite, 0);
    _poller.Complete();
    await _bcpBlock.Completion;
    ...
}

将 Channel 用作异步队列

对于异步发布者/订阅者场景,Channel 是比 BlockingCollection 更好的替代方案。粗略地说,这是一个异步队列,通过强制调用者使用 ChannelWriter 和 ChannelReader 类,阻止发布者读取或订阅者写入。事实上,只传递这些类而不传递 Channel 实例本身是很常见的。

在您的发布代码中,您可以创建一个 Channel&lt;T&gt; 并将其 Reader 传递给 GpsReportService 服务。假设发布者是另一个实现IGpsPublisher 接口的服务:

public interface IGpsPublisher
{
    ChannelReader<GspMessage> Reader{get;}
}

和实现


Channel<GpsMessage> _channel=Channel.CreateUnbounded<GpsMessage>();

public ChannelReader<GspMessage> Reader=>_channel;

private async void Ping(object state)
{
    foreach(var device in devices)
    {
        if(token.IsCancellationRequested)
        {
            break;
        }
        var msg=await device.ReadMessage();
        await _channel.Writer.WriteAsync(msg);
    }
}

public Task StartAsync(CancellationToken stoppingToken)
{

    _timer = new Timer(Ping, null, TimeSpan.Zero, 
        TimeSpan.FromSeconds(5));

    return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken) 
{
    _timer?.Change(Timeout.Infinite, 0);
    _channel.Writer.Complete();
}

这可以作为依赖项传递给 GpsReportService,由 DI 容器解析:

public sealed class GpsReportService : BackgroundService
{
    private readonly ChannelReader<GpsMessage> _reader;


    public GpsReportService(
        IGpsPublisher publisher,
        ...) 
    {
        _reader = publisher.Reader;
        ...
    }

用过

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await foreach(GpsMessage msg in _reader.ReadAllAsync(stopppingToken))
    {
       await _handler.ProcessAsync(msg);
    }
}

一旦发布者完成,订阅者循环也将在所有消息处理完毕后完成。

要并行处理,可以同时启动多个循环:

async Task Process(ChannelReader<GgpsMessage> reader,CancellationToken token)
{
    await foreach(GpsMessage msg in reader.ReadAllAsync(token))
    {
       await _handler.ProcessAsync(msg);
    }
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var tasks=Enumerable.Range(0,10)
                 .Select(_=>ProcessReader(_reader,stoppingToken))
                 .ToArray();
    await Task.WhenAll(tasks);
}

解释管道

我有类似的情况:每 15 分钟我向航空公司请求机票销售报告(实际上是 GDSs),解析它们以提取数据和票号,下载每张机票的机票记录以获取一些额外的数据并保存一切到数据库。我必须为 20 多个城市执行此操作(票务报告是每个城市),每个报告有 10 到超过 10 万张票。

这几乎需要管道。使用您的示例,您可以使用以下步骤/块创建管道:

  1. 侦听 GPS 消息并发出 未解析 消息。
  2. 解析消息并发出解析的消息
  3. 加载每条消息所需的任何额外数据并发出组合记录
  4. 处理合并的记录并发出结果
  5. (可选)批处理结果
  6. 将结果保存到数据库中

所有三个选项(Dataflow、Channels、Rx)都负责步骤之间的缓冲。 Dataflow 是一个需要组装的库,用于处理独立事件的管道,Rx 可用于分析时间很重要的事件流(例如,计算滑动窗口中的平均速度),Channels 是乐高积木,可以做任何事情,但需要放在一起。

为什么不使用 Parallel.ForEach

Parallel.ForEach 用于数据并行,而不是异步操作。它旨在处理大块内存中的数据,彼此独立。 Amdah's Law 解释说并行化的好处受到操作的同步部分的限制,因此所有数据并行库都试图通过分区来减少这种情况,并使用一个核心/机器/节点来处理每个分区。

Parallel.ForEach 还通过对数据进行分区并在每个 CPU 内核上使用大约一个工作任务来减少内核之间的同步。它甚至会使用当前线程,这会导致错误地假设它正在阻塞。当所有核心都忙时,为什么不使用线程?无论如何它都无法运行。

【讨论】:

  • 我将考虑实施 Dataflow 和 Channels 版本,以 1) 了解更多关于它们的信息,以及 2) 哪一个将成为最终赢家。目前SemaphorSlim 的实现正在跟上,并为我赢得了研究这两个方面所需的时间。我不得不承认,当我在宣布 Channels 时看到它时,它吓到了我,但我想这只是因为我不明白如何使用它们。感谢您提供的示例,当我开始进行大修时,我会参考它们。
  • I don't understand how to use them 这是我长期以来一直在抱怨的事情。没有关于它们的好教程。 Steve Gordon's article 可以作为初学者提供帮助,Stephen Toub's goes into greater depth 更深入地了解为什么事情会以它们的方式运行
  • @Gup3rSuR4c SemaphoreSlim 的问题是没有保留顺序。无法保证 GPS 消息将按照接收顺序进行处理。如果没有缓冲和排序事件,您将无法在内存中执行任何处理,并且您肯定会在数据库中收到乱序的消​​息,需要按时间戳排序才能按顺序检索
  • @Gup3rSuR4c 通常具有 GPS 和事件处理功能,订单很重要。在保证顺序的情况下,您可以简单地通过与已知的前一个点进行比较来计算点之间的距离。本质上,您只需要缓存一个点。没有订单,就无法知道您是否错过了要点。为此,您需要无限缓存,或者至少需要很多 的缓存。别介意更复杂的东西,比如异常检测或跟踪。或者只是使用滑动窗口来计算统计数据
【解决方案3】:

Parallel.ForEach 默认采用块分区,旨在减少 CPU 密集型应用程序中的同步开销,但在某些使用场景中可能会导致问题行为。可以通过传递 Partitioner&lt;T&gt; 而不是 IEnumerable&lt;T&gt; 作为参数来禁用块分区:

Parallel.ForEach(Partitioner.Create(_buffer.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering), options, ...

您还可以在本文中找到专门为BlockingCollection&lt;T&gt;s 量身定制的自定义分区器:ParallelExtensionsExtras Tour – #4 – BlockingCollectionExtensions

也就是说,Parallel.ForEach 不是异步友好的,这意味着它不理解异步委托。传递的 lambda 是async void,也就是something to avoid。所以我建议改用ActionBlock&lt;T&gt;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-04
    • 1970-01-01
    • 1970-01-01
    • 2016-07-09
    • 2023-04-09
    • 1970-01-01
    相关资源
    最近更新 更多